from __future__ import division, print_function, absolute_import
import abc
import warnings
import numpy as np
import pandas as pd
from h2o.frame import H2OFrame
from numpy.random import choice
from sklearn.externals import six
from sklearn.neighbors import NearestNeighbors
from skutil.base import overrides, BaseSkutil
from ..utils.fixes import dict_keys
from ..utils import *
__all__ = [
'OversamplingClassBalancer',
'SamplingWarning',
'SMOTEClassBalancer',
'UndersamplingClassBalancer'
]
[docs]class SamplingWarning(UserWarning):
"""Custom warning used to notify the user that sub-optimal sampling behavior
has occurred. For instance, performing oversampling on a minority class with only
one instance will cause this warning to be thrown.
"""
def _validate_ratio(ratio):
# validate ratio, if the current ratio is >= the ratio, it's "balanced enough"
if not isinstance(ratio, (float, np.float)) or ratio <= 0 or ratio > 1:
raise ValueError('ratio should be a float between 0.0 and 1.0, but got %s' % str(ratio))
return ratio
def _validate_target(y):
if (not y) or (not isinstance(y, six.string_types)):
raise ValueError('y must be a column name')
return str(y) # force string
def _validate_num_classes(cts):
mc, n_classes = BalancerMixin._max_classes, cts.shape[0]
if n_classes > mc:
raise ValueError('class balancing can only handle <= %i classes, but got %i' % (mc, n_classes))
elif n_classes < 2:
raise ValueError('class balancing requires at least 2 classes')
return n_classes
def _validate_x_y_ratio(X, y, ratio):
"""Validates the following, given that X is
already a validated pandas DataFrame:
1. That y is a string
2. That the number of classes does not exceed _max_classes
as defined by the BalancerMixin class
3. That the number of classes is at least 2
4. That ratio is a float that falls between 0.0 (exclusive) and
1.0 (inclusive)
Parameters
----------
X : Pandas ``DataFrame`` or ``H2OFrame``, shape=(n_samples, n_features)
The frame from which to sample
y : str
The name of the column that is the response class.
This is the column on which ``value_counts`` will be
executed to determine imbalance.
ratio : float
The ratio at which the balancing operation will
be performed. Used to determine whether balancing is
required.
Returns
-------
out_tup : tuple, shape=(3,)
a length-3 tuple with the following args:
[0] - cts (pd.Series), the ascending sorted ``value_counts``
of the class, where the index is the class label.
[1] - n_classes (int), the number of unique classes
[2] - needs_balancing (bool), whether the least populated class
is represented at a rate lower than the demanded ratio.
"""
ratio = _validate_ratio(ratio)
y = _validate_target(y) # force to string
is_factor = X.dtypes[y] == 'object'
# validate is < max classes
cts = X[y].value_counts().sort_values(ascending=True)
n_classes = _validate_num_classes(cts)
needs_balancing = (cts.values[0] / cts.values[-1]) < ratio
index = cts.index if not is_factor else cts.index.astype('str')
out_tup = (dict(zip(index.values, cts.values)), # cts
index, # labels sorted ascending by commonality
X[y].values if not is_factor else X[y].astype('str').values, # the target
n_classes,
needs_balancing)
return out_tup
class BalancerMixin:
"""Mixin class for balancers that provides interface for ``balance``
and the constant ``_max_classes`` (default=20). Used in h2o module as well.
"""
# the max classes handled by class balancers
_max_classes = 20
_def_ratio = 0.2
def balance(self, X):
"""This method must be overridden by
a subclass. This does nothing right now.
Parameters
----------
X : Pandas ``DataFrame`` or ``H2OFrame``, shape=(n_samples, n_features)
The frame from which to balance
"""
raise NotImplementedError('this method must be implemented by a subclass')
def _default_indices(length, shuffle):
x = np.arange(length)
return x.tolist() if not shuffle else np.random.permutation(x).tolist()
class _BaseBalancePartitioner(six.with_metaclass(abc.ABCMeta, object)):
"""Base class for sample partitioners. The partitioner class is
responsible for implementing the `_get_sample_indices` method, which
implements the specific logic for which rows to sample. The `get_indices`
method will return the indices that should be sampled (if using with H2O,
these should be sorted).
Parameters
----------
X : Pandas ``DataFrame`` or ``H2OFrame``, shape=(n_samples, n_features)
The frame from which to sample
y_name : str
The name of the column that is the response class
ratio : float
The ratio at which to sample
validation_function : callable, optional (default=_validate_x_y_ratio)
The function that will validate X, y and the ratio. This function
differs for H2OFrames.
"""
@abc.abstractmethod
def __init__(self, X, y_name, ratio, validation_function=_validate_x_y_ratio):
self.X = X
self.y = y_name
self.ratio = ratio
# perform validation_function
cts, index, target_col, n_classes, needs_balancing = validation_function(X, y_name, ratio)
self.cts = cts
self.index = index
self.target_col = target_col
self.needs_balancing = needs_balancing
def get_indices(self, shuffle):
return self._get_sample_indices(shuffle)
@abc.abstractmethod
def _get_sample_indices(self, shuffle):
"""To be overridden"""
raise NotImplementedError('must be overridden by subclass!')
class _OversamplingBalancePartitioner(_BaseBalancePartitioner):
"""Balance partitioner for oversampling the minority classes."""
def __init__(self, X, y_name, ratio, validation_function=_validate_x_y_ratio):
super(_OversamplingBalancePartitioner, self).__init__(
X, y_name, ratio, validation_function)
@overrides(_BaseBalancePartitioner)
def _get_sample_indices(self, shuffle):
# if we don't need balancing, then just return the indices as is
if not self.needs_balancing:
return _default_indices(self.X.shape[0], shuffle)
cts = self.cts
ratio = self.ratio
X, y = self.X, self.y
# get the maj class
majority = self.index[-1]
n_required = np.maximum(1, int(ratio * cts[majority]))
# target_col needs to be np array
target_col = self.target_col # already computed and in a NP array
all_indices = np.arange(X.shape[0])
sample_indices = []
for minority in self.index:
# since it's sorted, it means we've hit the end
if minority == majority:
break
min_ct = cts[minority]
if min_ct == 1:
warnings.warn('class %s only has one observation' % str(minority), SamplingWarning)
current_ratio = min_ct / cts[majority]
if current_ratio >= ratio:
continue # if ratio is already met, continue
n_samples = n_required - min_ct # the difference in the current present and the number we need
if n_samples <= 0: # the np maximum can cause weirdness
continue # move onto next class
minority_recs = all_indices[target_col == minority]
idcs = choice(minority_recs, n_samples, replace=True)
sample_indices.extend(list(idcs))
# make list
all_indices = list(all_indices)
all_indices.extend(sample_indices)
# sorted because h2o doesn't play nicely with random indexing
out = sorted(all_indices) if not shuffle else [j for j in np.random.permutation(all_indices)]
return out
class _UndersamplingBalancePartitioner(_BaseBalancePartitioner):
"""Balance partitioner for undersampling the minority class"""
def __init__(self, X, y_name, ratio, validation_function=_validate_x_y_ratio):
super(_UndersamplingBalancePartitioner, self).__init__(
X, y_name, ratio, validation_function)
@overrides(_BaseBalancePartitioner)
def _get_sample_indices(self, shuffle):
# if we don't need balancing, then just return the indices as is
if not self.needs_balancing:
return _default_indices(self.X.shape[0], shuffle)
cts = self.cts
ratio = self.ratio
X, y = self.X, self.y
# get the maj class
majority = self.index[-1]
next_most = self.index[-2] # the next-most-populous class label - we know there are at least two! (validation)
n_required = int((1 / ratio) * cts[next_most]) # i.e., if ratio == 0.5 and next_most == 30, n_required = 60
all_indices = np.arange(X.shape[0])
# check the exit condition (that majority class <= n_required)
if cts[majority] <= n_required:
return sorted(list(all_indices))
# if not returned early, drop some indices
target_col = self.target_col
majority_recs = all_indices[target_col == majority]
idcs = choice(majority_recs, n_required, replace=False)
# get all the "minority" observation idcs, append the sampled
# majority idcs, then sort and return
minorities = list(all_indices[target_col != majority])
minorities.extend(idcs)
out = sorted(minorities) if not shuffle else [j for j in np.random.permutation(minorities)]
return out
class _BaseBalancer(six.with_metaclass(abc.ABCMeta, BaseSkutil, BalancerMixin)):
"""A super class for all balancer classes. Balancers are not like TransformerMixins
or BaseEstimators, and do not implement fit or predict. This is because Balancers
are ONLY applied to training data.
Parameters
----------
y : str
The name of the response column. The response column must be
biclass, no more or less.
ratio : float, optional (default=0.2)
The target ratio of the minority records to the majority records. If the
existing ratio is >= the provided ratio, the return value will merely be
a copy of the input matrix, otherwise SMOTE will impute records until the
target ratio is reached.
shuffle : bool, optional (default=True)
Whether or not to shuffle rows on return
as_df : bool, optional (default=True)
Whether to return a Pandas ``DataFrame`` in the ``transform``
method. If False, will return a Numpy ``ndarray`` instead.
Since most skutil transformers depend on explicitly-named
``DataFrame`` features, the ``as_df`` parameter is True by default.
"""
def __init__(self, y, ratio=BalancerMixin._def_ratio, shuffle=True, as_df=True):
self.y_ = y
self.ratio = ratio
self.shuffle = shuffle
self.as_df = as_df
def _over_under_balance(X, y, ratio, as_df, shuffle, partitioner_class):
# check on state of X
X, _ = validate_is_pd(X, None) # there are no cols, and we don't want warnings
# since we rely on indexing X, we need to reset indices
# in case X is the result of a slice and they're out of order.
X.index = np.arange(X.shape[0])
partitioner = partitioner_class(X, y, ratio)
# the balancing is handled in the partitioner
balanced = X.iloc[partitioner.get_indices(shuffle)]
# we need to re-index...
balanced.index = np.arange(balanced.shape[0])
# return the combined frame
return balanced if as_df else balanced.as_matrix()
[docs]class OversamplingClassBalancer(_BaseBalancer):
"""Oversample all of the minority classes until they are
represented at the target proportion to the majority class.
Parameters
----------
y : str
The name of the response column. The response column must be
biclass, no more or less.
ratio : float, optional (default=0.2)
The target ratio of the minority records to the majority records. If the
existing ratio is >= the provided ratio, the return value will merely be
a copy of the input matrix
shuffle : bool, optional (default=True)
Whether or not to shuffle rows on return
as_df : bool, optional (default=True)
Whether to return a Pandas ``DataFrame`` in the ``transform``
method. If False, will return a Numpy ``ndarray`` instead.
Since most skutil transformers depend on explicitly-named
``DataFrame`` features, the ``as_df`` parameter is True by default.
Examples
--------
Consider the following example: with a ``ratio`` of 0.5, the
minority classes (1, 2) will be oversampled until they are represented
at a ratio of at least 0.5 * the prevalence of the majority class (0)
>>> import pandas as pd
>>> import numpy as np
>>>
>>> # 100 zeros, 30 ones and 25 twos
>>> X = pd.DataFrame(np.concatenate([np.zeros(100), np.ones(30), np.ones(25)*2]), columns=['A'])
>>> sampler = OversamplingClassBalancer(y="A", ratio=0.5)
>>>
>>> X_balanced = sampler.balance(X)
>>> X_balanced['A'].value_counts().sort_index()
0.0 100
1.0 50
2.0 50
Name: A, dtype: int64
"""
def __init__(self, y, ratio=BalancerMixin._def_ratio, shuffle=True, as_df=True):
super(OversamplingClassBalancer, self).__init__(ratio=ratio, y=y,
shuffle=shuffle,
as_df=as_df)
@overrides(BalancerMixin)
[docs] def balance(self, X):
"""Apply the oversampling balance operation. Oversamples
the minority class to the provided ratio of minority
class : majority class
Parameters
----------
X : Pandas ``DataFrame``, shape=(n_samples, n_features)
The data to balance.
Returns
-------
blnc : pandas ``DataFrame``, shape=(n_samples, n_features)
The balanced dataframe. The dataframe will be
explicitly shuffled if ``self.shuffle`` is True however,
if ``self.shuffle`` is False, preservation of original,
natural ordering is not guaranteed.
"""
blnc = _over_under_balance(X=X, y=self.y_, ratio=self.ratio,
shuffle=self.shuffle, as_df=self.as_df,
partitioner_class=_OversamplingBalancePartitioner)
return blnc
[docs]class SMOTEClassBalancer(_BaseBalancer):
"""Balance a matrix with the SMOTE (Synthetic Minority Oversampling TEchnique)
method. This will generate synthetic samples for the minority class(es) using
K-nearest neighbors
Parameters
----------
y : str
The name of the response column. The response column must be
biclass, no more or less.
ratio : float, optional (default=0.2)
The target ratio of the minority records to the majority records. If the
existing ratio is >= the provided ratio, the return value will merely be
a copy of the input matrix, otherwise SMOTE will impute records until the
target ratio is reached.
shuffle : bool, optional (default=True)
Whether or not to shuffle rows on return
k : int, def 3
The number of neighbors to use in the nearest neighbors model
as_df : bool, optional (default=True)
Whether to return a Pandas ``DataFrame`` in the ``transform``
method. If False, will return a Numpy ``ndarray`` instead.
Since most skutil transformers depend on explicitly-named
``DataFrame`` features, the ``as_df`` parameter is True by default.
Examples
--------
Consider the following example: with a ``ratio`` of 0.5, the
minority classes (1, 2) will be oversampled until they are represented
at a ratio of at least 0.5 * the prevalence of the majority class (0)
>>> import pandas as pd
>>> import numpy as np
>>> from numpy.random import RandomState
>>>
>>> # establish a random state
>>> prng = RandomState(42)
>>>
>>> # 100 zeros, 30 ones and 25 twos
>>> X = pd.DataFrame(np.asarray([prng.rand(155),
... np.concatenate([np.zeros(100), np.ones(30), np.ones(25)*2])]).T,
... columns=['x', 'y'])
>>> sampler = SMOTEClassBalancer(y="y", ratio=0.5)
>>>
>>> X_balanced = sampler.balance(X)
>>> X_balanced['y'].value_counts().sort_index()
0.0 100
1.0 50
2.0 50
Name: y, dtype: int64
"""
def __init__(self, y, ratio=BalancerMixin._def_ratio, shuffle=True, k=3, as_df=True):
super(SMOTEClassBalancer, self).__init__(ratio=ratio, y=y,
shuffle=shuffle,
as_df=as_df)
self.k = k
@overrides(BalancerMixin)
[docs] def balance(self, X):
"""Apply the SMOTE balancing operation. Oversamples
the minority class to the provided ratio of minority
class : majority class by interpolating points between
each sampled point's k-nearest neighbors.
Parameters
----------
X : Pandas ``DataFrame``, shape=(n_samples, n_features)
The data to balance.
Returns
-------
X : pandas ``DataFrame``, shape=(n_samples, n_features)
The balanced dataframe. The dataframe will be
explicitly shuffled if ``self.shuffle`` is True however,
if ``self.shuffle`` is False, preservation of original,
natural ordering is not guaranteed.
"""
# check on state of X
X, _ = validate_is_pd(X, None, assert_all_finite=True) # there are no cols, and we don't want warnings
# since we rely on indexing X, we need to reset indices
# in case X is the result of a slice and they're out of order.
X.index = np.arange(0, X.shape[0])
ratio = self.ratio
cts, index, target_col, n_classes, needs_balancing = _validate_x_y_ratio(X, self.y_, ratio)
# if we don't need balancing, then just return the indices as is
if not needs_balancing:
return X if not self.shuffle else shuffle_dataframe(X)
# get the maj class
majority = index[-1]
n_required = np.maximum(1, int(ratio * cts[majority]))
for minority in index:
if minority == majority:
break
min_ct = cts[minority]
if min_ct == 1:
raise ValueError('cannot perform SMOTE on only one observation (class=%s)' % str(minority))
current_ratio = min_ct / cts[majority]
if current_ratio >= ratio:
continue # if ratio is already met, continue
n_samples = n_required - min_ct # the difference in the current present and the number we need
# the np maximum can cause weirdness
if n_samples <= 0:
continue # move onto next class
# don't need to validate K, neighbors will
# randomly select n_samples points from the minority records
minority_recs = X[X[self.y_] == minority]
replace = n_samples > minority_recs.shape[0] # may have to replace if required num > num available
idcs = choice(minority_recs.index, n_samples, replace=replace)
pts = X.iloc[idcs].drop([self.y_], axis=1)
# Fit the neighbors model on the random points
nn = NearestNeighbors(n_neighbors=self.k).fit(pts)
# do imputation
synthetics_pts = []
for neighbors in nn.kneighbors()[1]: # go over indices
mn = pts.iloc[neighbors].mean()
# add the minority target, and the mean record
synthetics_pts.append(mn.tolist())
# append the minority target to the frame
syn_frame = pd.DataFrame.from_records(data=synthetics_pts, columns=pts.columns)
syn_frame[self.y_] = np.array([minority] * syn_frame.shape[0])
# reorder the columns
syn_frame = syn_frame[X.columns]
# append to X
X = pd.concat([X, syn_frame])
# reset index
X.index = np.arange(X.shape[0])
# shuffle if necessary
X = X if not self.shuffle else shuffle_dataframe(X)
# return the combined frame
return X if self.as_df else X.as_matrix()
[docs]class UndersamplingClassBalancer(_BaseBalancer):
"""Undersample the majority class until it is represented
at the target proportion to the most-represented minority class
(i.e., the second-most populous class).
Parameters
----------
y : str
The name of the response column. The response column must be
biclass, no more or less.
ratio : float, optional (default=0.2)
The target ratio of the minority records to the majority records. If the
existing ratio is >= the provided ratio, the return value will merely be
a copy of the input matrix
shuffle : bool, optional (default=True)
Whether or not to shuffle rows on return
as_df : bool, optional (default=True)
Whether to return a Pandas ``DataFrame`` in the ``transform``
method. If False, will return a Numpy ``ndarray`` instead.
Since most skutil transformers depend on explicitly-named
``DataFrame`` features, the ``as_df`` parameter is True by default.
Examples
--------
Consider the following example: with a ``ratio`` of 0.5, the
majority class (0) will be undersampled until the second most-populous
class (1) is represented at a ratio of 0.5.
>>> import pandas as pd
>>> import numpy as np
>>>
>>> # 150 zeros, 30 ones and 10 twos
>>> X = pd.DataFrame(np.concatenate([np.zeros(150), np.ones(30), np.ones(10)*2]), columns=['A'])
>>> sampler = UndersamplingClassBalancer(y="A", ratio=0.5)
>>>
>>> X_balanced = sampler.balance(X)
>>> X_balanced['A'].value_counts().sort_index()
0.0 60
1.0 30
2.0 10
Name: A, dtype: int64
"""
def __init__(self, y, ratio=0.2, shuffle=True, as_df=True):
super(UndersamplingClassBalancer, self).__init__(ratio=ratio, y=y,
shuffle=shuffle,
as_df=as_df)
[docs] def balance(self, X):
"""Apply the undersampling balance operation. Undersamples
the majority class to the provided ratio over the second-most-
populous class label.
Parameters
----------
X : Pandas ``DataFrame``, shape=(n_samples, n_features)
The data to balance.
Returns
-------
blnc : pandas ``DataFrame``, shape=(n_samples, n_features)
The balanced dataframe. The dataframe will be
explicitly shuffled if ``self.shuffle`` is True however,
if ``self.shuffle`` is False, preservation of original,
natural ordering is not guaranteed.
"""
blnc = _over_under_balance(X=X, y=self.y_, ratio=self.ratio,
shuffle=self.shuffle, as_df=self.as_df,
partitioner_class=_UndersamplingBalancePartitioner)
return blnc