Source code for skoot.preprocessing.skewness

# -*- coding: utf-8 -*-
#
# Author: Taylor Smith <taylor.smith@alkaline-ml.com>
#
# Classes and functions for rectifying skewness in transformers.

import numpy as np
from abc import ABCMeta, abstractmethod

from scipy import optimize
from scipy.stats import boxcox

import six
from joblib import Parallel, delayed
from sklearn.utils.validation import check_is_fitted

from ..base import BasePDTransformer
from ..decorators import suppress_warnings as suppress
from ..utils.validation import (check_dataframe, validate_multiple_rows,
                                validate_test_set_columns)
from ..utils.dataframe import dataframe_or_array
from ..utils.metaestimators import timed_instance_method

__all__ = [
    'BoxCoxTransformer',
    'YeoJohnsonTransformer'
]

ZERO = 1E-16


def _bc_est_lam(y, min_value, dtype, suppress_warnings):
    """Estimate the lambda param for box-cox transformations.

    Estimate lambda for a single y, given a range of lambdas
    through which to search. No validation performed.

    Parameters
    ----------
    y : np.ndarray, shape (n_samples,)
       The vector from which lambda is being estimated
    """
    # ensure is array, floor at min_value
    y = np.maximum(np.asarray(y).astype(dtype), min_value)

    # Use scipy's log-likelihood estimator (suppress the inner optimization
    # routine otherwise it gets pretty annoyingly verbose)
    @suppress
    def _boxcox_inner():
        return boxcox(y, lmbda=None)

    # if we want to suppress, decorate now
    if suppress_warnings:
        _boxcox_inner = suppress(_boxcox_inner)
    b = _boxcox_inner()

    # Return lambda corresponding to maximum P
    return b[1]


def _yj_est_lam(y, brack, dtype=np.float32):
    y = np.asarray(y).astype(dtype)

    # Use MLE to compute the optimal YJ parameter
    def _mle_opt(i, brck):
        def _eval_mle(lmb, data):
            # Function to minimize
            return -_yj_llf(data, lmb)

        # Suppress the invalid scalar warnings we might get in the
        # optimization routine.
        @suppress
        def brent_optimize():
            return optimize.brent(_eval_mle, brack=brck, args=(i,))

        # suppressed version:
        return brent_optimize()

    return _mle_opt(y, brack)  # _mle(x, brack)


def _yj_llf(data, lmb):
    """YJ-transform a vector.

    Transform a y vector given a single lambda value,
    and compute the log-likelihood function. No validation
    is applied to the input.

    Parameters
    ----------
    data : array-like
       The vector to transform

    lmb : scalar
       The lambda value
    """
    # make into a numpy array, if not already one
    data = np.asarray(data)
    n = data.shape[0]

    # transform the vector
    y_trans = _yj_transform_y(data, lmb)

    # We can't take the canonical log of data, as there could be
    # zeros or negatives. Thus, we need to shift both distributions
    # up by some arbitrary factor just for the LLF computation
    min_d, min_y = np.min(data), np.min(y_trans)
    if min_d < ZERO:
        shift = np.abs(min_d) + 1
        data += shift

    # Same goes for Y
    if min_y < ZERO:
        shift = np.abs(min_y) + 1
        y_trans += shift

    # Compute mean on potentially shifted data
    y_mean = np.mean(y_trans, axis=0)
    var = np.sum((y_trans - y_mean) ** 2. / n, axis=0)

    # If var is 0.0, we'll get a warning. Means all the
    # values were nearly identical in y, so we will return
    # NaN so we don't optimize for this value of lam
    if 0 == var:
        return np.nan

    llf = (lmb - 1) * np.sum(np.log(data), axis=0)
    llf -= n / 2.0 * np.log(var)

    return llf


def _yj_transform_y(y, lam, dtype=np.float32):
    # should already be a vec, but just gotta be sure
    y = np.asarray(y).astype(dtype)

    # need some different masks...
    gte_zero_mask = y >= 0
    lt_zero_mask = ~gte_zero_mask  # negative number

    # lambda "masks" (just scalar booleans...)
    lam_gt_zero = lam > ZERO
    lam_eq_zero = not lam_gt_zero  # because bound in (0, 2)
    lam_eq_two = lam == 2.  # max, because bound in (0, 2)
    lam_not_two = not lam_eq_two

    # Case 1: x >= 0 and lambda is not 0
    c1_mask = gte_zero_mask & lam_gt_zero
    y[c1_mask] = (((y[c1_mask] + 1.) ** lam) - 1.0) / lam

    # Case 2: x >= 0 and lambda IS 0
    c2_mask = gte_zero_mask & lam_eq_zero
    y[c2_mask] = np.log(y[c2_mask] + 1.)

    # Case 3: x < 0 and lambda is not two
    c3_mask = lt_zero_mask & lam_not_two
    two_min_lam = (2. - lam)
    y[c3_mask] = -(((-y[c3_mask] + 1.) ** two_min_lam) - 1.0) / two_min_lam

    # Case 4: x < 0 and lam == 2.
    c4_mask = lt_zero_mask & lam_eq_two
    y[c4_mask] = -np.log(-y[c4_mask] + 1.)

    # Old method of mapping over single elements (super slow)
    # def _yj_trans_single_x(x):
    #     if x >= 0:
    #         # Case 1: x >= 0 and lambda is not 0
    #         if not _eqls(lam, ZERO):
    #             return (np.power(x + 1, lam) - 1.0) / lam
    #
    #         # Case 2: x >= 0 and lambda is zero
    #         return log(x + 1)
    #     else:
    #         # Case 3: x < 0 and lambda is not two
    #         if not lam == 2.0:
    #             denom = 2.0 - lam
    #             numer = np.power((-x + 1), (2.0 - lam)) - 1.0
    #             return -numer / denom
    #
    #         # Case 4: x < 0 and lambda is two
    #         return -log(-x + 1)
    #
    # return np.array([_yj_trans_single_x(x) for x in y])

    return y


class _BaseSkewnessTransformer(six.with_metaclass(ABCMeta, BasePDTransformer)):
    def __init__(self, cols, n_jobs, as_df, dtype):

        super(_BaseSkewnessTransformer, self).__init__(
            cols=cols, as_df=as_df)

        self.n_jobs = n_jobs
        self.dtype = dtype

    def _fit(self, X, estimation_function):
        # check on state of X and cols (all cols need to be finite!)
        X, cols = check_dataframe(X, cols=self.cols, assert_all_finite=True)

        # ensure enough rows
        validate_multiple_rows(self.__class__.__name__, X)

        # Now estimate the lambdas in parallel
        n_jobs = self.n_jobs
        dtype = self.dtype
        kwargs = self._estimator_kwargs()

        self.lambda_ = list(
            Parallel(n_jobs=n_jobs)(
                delayed(estimation_function)(X[i], dtype, **kwargs)
                for i in cols))

        # set the fit cols
        self.fit_cols_ = cols

        return self

    @abstractmethod
    def _transform_vector(self, y, lam):
        """An abstract function for box-cox and YJ transformers.
        This function should transform a vector given the pre-estimated
        lambda value.
        """

    def _estimator_kwargs(self):
        """Get the **kwargs for the estimator functions.

        BoxCox and YeoJohnson estimators take different args for their
        estimating functions, so this allows us to pass the kwargs to the
        ``estimate`` func. Default is an empty dict but can be overridden
        by subclasses that need it.
        """
        return dict()

    def transform(self, X):
        """Apply the transformation to a dataframe.

        Parameters
        ----------
        X : pd.DataFrame, shape=(n_samples, n_features)
            The Pandas frame to transform. The operation will
            be applied to a copy of the input data, and the result
            will be returned.

        Returns
        -------
        X : pd.DataFrame or np.ndarray, shape=(n_samples, n_features)
            The operation is applied to a copy of ``X``,
            and the result set is returned.
        """
        check_is_fitted(self, 'lambda_')

        # check on state of X and cols
        X, _ = check_dataframe(X, cols=self.cols, assert_all_finite=True)

        # validate the test columns
        cols = self.fit_cols_
        validate_test_set_columns(cols, X.columns)

        # we don't care how many samples are in the test set... just need
        # > 1 for the fit/estimation procedure, but not the test set.
        _, n_features = X.shape
        lambdas_ = self.lambda_

        # do transformations
        for nm, lam in zip(cols, lambdas_):
            X[nm] = self._transform_vector(X[nm], lam)

        return dataframe_or_array(X, self.as_df)


# A dumb hack bc we cannot pickle functions or instancemethods...
# so these estimator wrappers simply call the appropriate estimator
# function while allowing us to abstract out the fit/transform code.
class _BCEstimator(object):
    def __init__(self, min_val):
        self.min_val = min_val

    def __call__(self, y, dtype, suppress_warnings, **kwargs):
        return _bc_est_lam(y, self.min_val, dtype,
                           suppress_warnings)


class _YJEstimator(object):
    def __init__(self, brack):
        self.brack = brack

    def __call__(self, y, dtype, **kwargs):
        return _yj_est_lam(y, self.brack, dtype)


[docs]class BoxCoxTransformer(_BaseSkewnessTransformer): r"""Apply the Box-Cox transformation to select features in a dataframe. Estimate a lambda parameter for each feature, and transform it to a distribution more-closely resembling a Gaussian bell using the Box-Cox transformation. The Box-Cox transformation cannot handle zeros or negative values in :math:`y`. Skoot attempts to deal with this scenario by imposing a ceiling function of ``min_value`` for any values that are <= 0. The transformation is defined as: :math:`y_{i} = \left\{\begin{matrix} \frac{y_{i}^\lambda - 1}{\lambda} & \textup{if } \lambda \neq 0, \\ ln(y_{i}) & \textup{if } \lambda = 0 \end{matrix}\right.` Parameters ---------- cols : array-like, shape=(n_features,), optional (default=None) The names of the columns on which to apply the transformation. If no column names are provided, the transformer will be ``fit`` on the entire frame. Note that the transformation will also only apply to the specified columns, and any other non-specified columns will still be present after transformation. Note that since this transformer can only operate on numeric columns, not explicitly setting the ``cols`` parameter may result in errors for categorical data. n_jobs : int, 1 by default The number of jobs to use for the computation. This works by estimating each of the feature lambdas in parallel. If -1 all CPUs are used. If 1 is given, no parallel computing code is used at all, which is useful for debugging. For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. Thus for n_jobs = -2, all CPUs but one are used. as_df : bool, optional (default=True) Whether to return a Pandas ``DataFrame`` in the ``transform`` method. If False, will return a Numpy ``ndarray`` instead. Since most skutil transformers depend on explicitly-named ``DataFrame`` features, the ``as_df`` parameter is True by default. min_value : float, optional (default=1e-12) The minimum value as a ceiling function for values in prescribed features. Values below this amount will be set to ``min_value``. dtype : type, optional (default=np.float32) The type of float to which to cast the vector. Default is float32 to avoid overflows. suppress_warnings : bool, optional (default=False) Whether to suppress warnings in the scipy.stats.boxcox function. Default is False. Attributes ---------- lambda_ : list The lambda values corresponding to each feature fit_cols_ : list The list of column names on which the transformer was fit. This is used to validate the presence of the features in the test set during the ``transform`` stage. """
[docs] def __init__(self, cols=None, n_jobs=1, as_df=True, min_value=1e-12, dtype=np.float32, suppress_warnings=False): super(BoxCoxTransformer, self).__init__( cols=cols, as_df=as_df, n_jobs=n_jobs, dtype=dtype) self.min_value = min_value self.suppress_warnings = suppress_warnings
[docs] @timed_instance_method(attribute_name="fit_time_") def fit(self, X, y=None): """Fit the transformer. Parameters ---------- X : pd.DataFrame, shape=(n_samples, n_features) The Pandas frame to fit. The frame will only be fit on the prescribed ``cols`` (see ``__init__``) or all of them if ``cols`` is None. y : array-like or None, shape=(n_samples,), optional (default=None) Pass-through for ``sklearn.pipeline.Pipeline``. """ min_value = self.min_value return self._fit(X, estimation_function=_BCEstimator(min_value))
def _transform_vector(self, vec, lam): # make a np array, make sure we've floored y = np.maximum(np.asarray(vec).astype(self.dtype), self.min_value) # if lam is not "zero", y gets the power treatment: if lam > ZERO: return (y ** lam - 1.) / lam # otherwise, it gets logged return np.log(y) def _estimator_kwargs(self): return dict(suppress_warnings=self.suppress_warnings)
[docs]class YeoJohnsonTransformer(_BaseSkewnessTransformer): r"""Apply the Yeo-Johnson transformation to a dataset. Estimate a lambda parameter for each feature, and transform it to a distribution more-closely resembling a Gaussian bell using the Yeo-Johnson transformation. The Yeo-Johnson transformation, unlike the :class:`BoxCoxTransformer`, allows for zero and negative values of :math:`y` and as defined as: :math:`y_{i} = \left\{\begin{matrix} ((y_{i} + 1)^\lambda - 1)/\lambda & \textup{if } \lambda \neq 0, y \geq 0 \\ log(y_{i} + 1) & \textup{if } \lambda = 0, y \geq 0 \\ -[(-y_{i} + 1)^{(2 - \lambda)} - 1]/(2 - \lambda) & \textup{if } \lambda \neq 2, y < 0 \\ -log(-y_{i} + 1) & \textup{if } \lambda = 2, y < 0 \\ \end{matrix}\right.` Parameters ---------- cols : array-like, shape=(n_features,), optional (default=None) The names of the columns on which to apply the transformation. If no column names are provided, the transformer will be ``fit`` on the entire frame. Note that the transformation will also only apply to the specified columns, and any other non-specified columns will still be present after transformation. Note that since this transformer can only operate on numeric columns, not explicitly setting the ``cols`` parameter may result in errors for categorical data. n_jobs : int, 1 by default The number of jobs to use for the computation. This works by estimating each of the feature lambdas in parallel. If -1 all CPUs are used. If 1 is given, no parallel computing code is used at all, which is useful for debugging. For n_jobs below -1, (n_cpus + 1 + n_jobs) are used. Thus for n_jobs = -2, all CPUs but one are used. as_df : bool, optional (default=True) Whether to return a Pandas ``DataFrame`` in the ``transform`` method. If False, will return a Numpy ``ndarray`` instead. Since most skutil transformers depend on explicitly-named ``DataFrame`` features, the ``as_df`` parameter is True by default. brack : tuple, optional (default=(-2, 2)) Either a triple (xa, xb, xc) where xa < xb < xc and func(xb) < func(xa), func(xc) or a pair (xa, xb) which are used as a starting interval for a downhill bracket search. Providing the pair (xa, xb) does not always mean the obtained solution will satisfy xa <= x <= xb. dtype : type, optional (default=np.float32) The type of float to which to cast the vector. Default is float32 to avoid overflows. Attributes ---------- lambda_ : list The lambda values corresponding to each feature fit_cols_ : list The list of column names on which the transformer was fit. This is used to validate the presence of the features in the test set during the ``transform`` stage. """
[docs] def __init__(self, cols=None, n_jobs=1, as_df=True, brack=(-2, 2), dtype=np.float32): super(YeoJohnsonTransformer, self).__init__( cols=cols, as_df=as_df, n_jobs=n_jobs, dtype=dtype) self.brack = brack
[docs] @timed_instance_method(attribute_name="fit_time_") def fit(self, X, y=None): """Fit the transformer. Parameters ---------- X : pd.DataFrame, shape=(n_samples, n_features) The Pandas frame to fit. The frame will only be fit on the prescribed ``cols`` (see ``__init__``) or all of them if ``cols`` is None. y : array-like or None, shape=(n_samples,), optional (default=None) Pass-through for ``sklearn.pipeline.Pipeline``. """ brack = self.brack return self._fit(X, estimation_function=_YJEstimator(brack))
def _transform_vector(self, y, lam): return _yj_transform_y(y, lam, self.dtype)