from __future__ import division, absolute_import, print_function
from h2o.frame import H2OFrame
from ..utils.fixes import is_iterable, dict_keys
import pandas as pd
import numpy as np
import warnings
__all__ = [
'GainsStatisticalReport'
]
def _as_numpy(*args):
"""Given an iterable (a 1d list, np.ndarray, pd.Series,
pd.DataFrame or H2OFrame), convert it into a 1d np.ndarray
for further processing.
Returns
-------
arrs : list
Returns a list (of 1d np.ndarrays) of length==len(args)
"""
def _single_as_numpy(x):
if not isinstance(x, np.ndarray):
# if an H2OFrame, just return the first col
if isinstance(x, H2OFrame):
# same as ..h2o.util.h2o_col_to_numpy, but
# that causes circular dependency in imports.
if not x.shape[1] == 1:
raise ValueError('must be 1d column')
_1d = x[x.columns[0]].as_data_frame(use_pandas=True)
return _1d[_1d.columns[0]].values
elif is_iterable(x):
return np.asarray(x)
else:
raise TypeError('cannot create numpy array out of type=%s' % type(x))
else:
return np.copy(x)
arrs = [_single_as_numpy(i) for i in args]
if len(arrs) == 1:
arrs = arrs[0]
return arrs
[docs]class GainsStatisticalReport(object):
"""A class that computes actuarial statistics for scoring predictions
given prescribed weighting and loss data. Primarily intended for use with
``skutil.h2o.H2OGainsRandomizedSearchCV``.
Parameters
----------
n_groups : int, optional (default=10)
The number of groups to use for lift and gini computations.
score_by : str, optional (default='lift')
The metric to return for the ``score`` method.
n_folds : int, optional (default=None)
The number of folds that are being fit.
error_score : float, optional (default=np.nan)
The score to return for a ``pd.qcut`` error
error_behavior : str, optional (default='warn')
One of {'warn', 'raise', 'ignore'}. How to handle non-unique
bin edges in pd.qcut
"""
# maximizing score functions must be multiplied by
# -1 in order to most "minimize" some loss function
_signs = {
'lift': -1,
'gini': -1
}
def __init__(self, n_groups=10, n_folds=None, n_iter=None,
score_by='lift', iid=True, error_score=np.nan,
error_behavior='warn'):
self.n_groups = 10
self.score_by = score_by
met = dict_keys(self._signs)
self.stats = {m: [] for m in met}
self.sample_sizes = []
self.n_folds = n_folds
self.n_iter = n_iter
self.iid = iid
self.error_score = error_score
self.error_behavior = error_behavior
# validate score_by
if score_by not in self._signs:
raise ValueError('score_by must be in %s, but got %s'
% (', '.join(met), score_by))
# how many to store in the scoring method?
if n_folds and not n_iter:
raise ValueError('if n_folds is set, must set n_iter')
[docs] def as_data_frame(self):
"""Get the summary report of the fold fits in the
form of a pd.DataFrame.
Returns
-------
df : pd.DataFrame
A dataframe of summary statistics for each fold
"""
if not self.n_folds:
# if there were no folds, these are each individual scores
return pd.DataFrame.from_dict(self.stats)
else:
# otherwise they are cross validation scores...
# ensure divisibility...
n_obs, n_folds, n_iter = len(self.stats[dict_keys(self._signs)[0]]), self.n_folds, self.n_iter
if not (n_folds * n_iter) == n_obs:
raise ValueError('n_obs is not divisible by n_folds and n_iter')
new_stats = {}
for metric in dict_keys(self._signs):
new_stats['%s_mean' % metric] = [] # the mean scores
new_stats['%s_std' % metric] = [] # the std scores
new_stats['%s_min' % metric] = [] # the min scores
new_stats['%s_max' % metric] = [] # the max scores
idx = 0
for _ in range(n_iter):
fold_score = 0
n_test_samples = 0
all_fold_scores = []
for fold in range(n_folds):
this_score = self.stats[metric][idx]
this_n_test_samples = self.sample_sizes[idx]
all_fold_scores.append(this_score)
if self.iid:
this_score *= this_n_test_samples
n_test_samples += this_n_test_samples
fold_score += this_score
idx += 1
if self.iid:
fold_score /= float(n_test_samples)
else:
fold_score /= float(n_folds)
# append the mean score, and then the std of the scores for the folds
new_stats['%s_mean' % metric].append(fold_score)
new_stats['%s_std' % metric].append(np.std(all_fold_scores))
new_stats['%s_min' % metric].append(np.min(all_fold_scores))
new_stats['%s_max' % metric].append(np.max(all_fold_scores))
df = pd.DataFrame.from_dict(new_stats)
# let's order by names
return df[sorted(df.columns.values)]
def _compute_stats(self, pred, expo, loss, prem):
n_samples, n_groups = pred.shape[0], self.n_groups
pred_ser = pd.Series(pred)
loss_to_returns = np.sum(loss) / np.sum(prem)
rank = pd.qcut(pred_ser, n_groups, labels=False)
n_groups = np.amax(rank) + 1
groups = np.arange(n_groups) # if we ever go back to using n_groups...
tab = pd.DataFrame({
'rank': rank,
'pred': pred,
'prem': prem,
'loss': loss,
'expo': expo
})
grouped = tab[['rank', 'pred', 'prem', 'loss', 'expo']].groupby('rank')
agg_rlr = (grouped['loss'].agg(np.sum) / grouped['prem'].agg(np.sum)) / loss_to_returns
return tab, agg_rlr, n_groups
[docs] def score(self, _, pred, **kwargs):
"""Scores the new predictions on the truth set,
and stores the results in the internal stats array.
Parameters
----------
_ : H2OFrame, np.ndarray
The truth set
pred : H2OFrame, np.ndarray
The predictions
Returns
-------
scr : float
The score (lift/gini) for the new predictions
"""
scr = self._score(_, pred, True, **kwargs)
return scr
[docs] def score_no_store(self, _, pred, **kwargs):
"""Scores the new predictions on the truth set,
and does not store the results in the internal
stats array.
Parameters
----------
_ : H2OFrame, np.ndarray
The truth set
pred : H2OFrame, np.ndarray
The predictions
Returns
-------
scr : float
The score (lift/gini) for the new predictions
"""
scr = self._score(_, pred, False, **kwargs)
return scr
def _score(self, _, pred, store, **kwargs):
"""Scores the new predictions on the truth set.
Parameters
----------
_ : H2OFrame, np.ndarray
The truth set
pred : H2OFrame, np.ndarray
The predictions
store : bool, optional (default=True)
Whether to store the results. If called from a grid search,
this will store the results. If called from the grid search
``score`` method after fit, it will not.
Returns
-------
scr : float
The score (lift/gini) for the new predictions
"""
# For scoring from gridsearch...
expo, loss, prem = kwargs.get('expo'), kwargs.get('loss'), kwargs.get('prem', None)
self.fit_fold(pred, expo, loss, prem, store)
# return the score we want... grid search is MINIMIZING
# so we need to return negative for maximizing metrics
scr = self.stats[self.score_by][-1] * self._signs[self.score_by]
return scr
[docs] def fit_fold(self, pred, expo, loss, prem=None, store=True):
"""Used to fit a single fold of predicted values,
exposure and loss data.
Parameters
----------
pred : 1d H2OFrame, pd.DataFrame, np.ndarray
The array of predictions
expo : 1d H2OFrame, pd.DataFrame, np.ndarray
The array of exposure values
loss : 1d H2OFrame, pd.DataFrame, np.ndarray
The array of loss values
prem : 1d H2OFrame, pd.DataFrame, np.ndarray, optional (default=None)
The array of premium values. If None, is
equal to the ``expo`` parameter.
store : bool, optional (default=True)
Whether or not to store the results of
the scoring procedure. This is set to false
when calling ``score``, which is intended for
test data.
Returns
-------
self
"""
# check params
if self.error_behavior not in ('warn', 'raise', 'ignore'):
raise ValueError('error_behavior must be one of ("warn", "raise", "ignore"). '
'Encountered %s' % str(self.error_behavior))
on_error = self.error_behavior
pred, expo, loss = _as_numpy(pred, expo, loss)
if prem is None:
prem = np.copy(expo)
else:
prem = _as_numpy(prem)
# compute the stats
try:
tab, stats, n_groups = self._compute_stats(pred, expo, loss, prem)
kwargs = {
'pred': pred, 'expo': expo,
'loss': loss, 'prem': prem,
'stats': stats, 'tab': tab,
'n_groups': n_groups
}
# compute the metrics. This relies on the convention
# that the computation method is the name of the metric
# preceded by an underscore...
if store:
for metric in dict_keys(self._signs):
self.stats[metric].append(
getattr(self, '_%s' % metric)(**kwargs)
)
except ValueError as v: # for a qcut error...
if on_error == 'raise':
raise v
elif on_error == 'warn':
warnings.warn('Encountered non-unique bin edges. Score defaults to %s'
% str(self.error_score), UserWarning)
# if it's ignore, it will pass.
if store:
for metric in dict_keys(self._signs):
self.stats[metric].append(self.error_score)
self.sample_sizes.append(pred.shape[0])
return self
def _lift(self, **kwargs):
agg = kwargs.pop('stats')
n_groups = kwargs.pop('n_groups')
f, l = agg[0], agg[n_groups - 1]
lft = l / f if l > f else f / l
return lft
def _gini(self, **kwargs):
# we want a copy, not the original
tab = kwargs.pop('tab').copy()[['pred', 'loss', 'prem', 'expo']]
tab['idx'] = tab.index
tab = tab.sort_values(by=['pred', 'idx'], axis=0, inplace=False)
cpct = {x: tab[x].cumsum() / tab[x].sum() for x in ('prem', 'loss')}
diff_pct = cpct['prem'] - cpct['loss']
return 2 * np.average(diff_pct, weights=tab['expo'])