Source code for skutil.h2o.util

from __future__ import print_function, division, absolute_import
import numpy as np
import h2o
import pandas as pd

import warnings
from collections import Counter
from pkg_resources import parse_version
from ..utils import (validate_is_pd, human_bytes, corr_plot,
                     load_breast_cancer_df, load_iris_df,
                     load_boston_df)
from .frame import _check_is_1d_frame
from .select import _validate_use
from .base import check_frame
from .fixes import rbind_all

from h2o.frame import H2OFrame
from sklearn.utils.validation import check_array

__all__ = [
    'from_array',
    'from_pandas',
    'h2o_bincount',
    'h2o_col_to_numpy',
    'h2o_corr_plot',
    'h2o_frame_memory_estimate',
    'load_iris_h2o',
    'load_boston_h2o',
    'load_breast_cancer_h2o',
    'reorder_h2o_frame',
    'shuffle_h2o_frame'
]


[docs]def load_iris_h2o(include_tgt=True, tgt_name="Species", shuffle=False): """Load the iris dataset into an H2OFrame Parameters ---------- include_tgt : bool, optional (default=True) Whether or not to include the target tgt_name : str, optional (default="Species") The name of the target column. shuffle : bool, optional (default=False) Whether or not to shuffle the data """ X = from_pandas(load_iris_df(include_tgt, tgt_name, shuffle)) if include_tgt: X[tgt_name] = X[tgt_name].asfactor() return X
[docs]def load_breast_cancer_h2o(include_tgt=True, tgt_name="target", shuffle=False): """Load the breast cancer dataset into an H2OFrame Parameters ---------- include_tgt : bool, optional (default=True) Whether or not to include the target tgt_name : str, optional (default="target") The name of the target column. shuffle : bool, optional (default=False) Whether or not to shuffle the data """ X = from_pandas(load_breast_cancer_df(include_tgt, tgt_name, shuffle)) if include_tgt: X[tgt_name] = X[tgt_name].asfactor() return X
[docs]def load_boston_h2o(include_tgt=True, tgt_name="target", shuffle=False): """Load the boston housing dataset into an H2OFrame Parameters ---------- include_tgt : bool, optional (default=True) Whether or not to include the target tgt_name : str, optional (default="target") The name of the target column. shuffle : bool, optional (default=False) Whether or not to shuffle the data """ X = from_pandas(load_boston_df(include_tgt, tgt_name, shuffle)) return X
[docs]def h2o_col_to_numpy(column): """Return a 1d numpy array from a single H2OFrame column. Parameters ---------- column : H2OFrame column, shape=(n_samples, 1) A column from an H2OFrame Returns ------- np.ndarray, shape=(n_samples,) """ x = _check_is_1d_frame(column) _1d = x[x.columns[0]].as_data_frame(use_pandas=True) return _1d[_1d.columns[0]].values
def _unq_vals_col(column): """Get the unique values and column name from a column. Returns ------- str, np.ndarray : tuple (c1_nm, unq) """ unq = column.unique().as_data_frame(use_pandas=True) c1_nm = unq.columns[0] unq = unq[unq.columns[0]].sort_values().reset_index() return c1_nm, unq
[docs]def h2o_bincount(bins, weights=None, minlength=None): """Given a 1d column of non-negative ints, ``bins``, return a np.ndarray of positional counts of each int. Parameters ---------- bins : H2OFrame The values weights : list or H2OFrame, optional (default=None) The weights with which to weight the output minlength : int, optional (default=None) The min length of the output array """ bins = _check_is_1d_frame(bins) _, unq = _unq_vals_col(bins) # ensure all positive unq_arr = unq[_].values if any(unq_arr < 0): raise ValueError('values must be positive') # make sure they're all ints if np.abs((unq_arr.astype(np.int) - unq_arr).sum()) > 0: raise ValueError('values must be ints') # adjust minlength if minlength is None: minlength = 1 elif minlength < 0: raise ValueError('minlength must be positive') # create our output array all_vals = h2o_col_to_numpy(bins) output = np.zeros(np.maximum(minlength, unq_arr.max() + 1)) # check weights if weights is not None: if isinstance(weights, (list, tuple)): weights = np.asarray(weights) elif isinstance(weights, H2OFrame): weights = h2o_col_to_numpy(weights) if weights.shape[0] != all_vals.shape[0]: raise ValueError('dim mismatch in weights and bins') else: weights = np.ones(all_vals.shape[0]) # update our bins for val in unq_arr: mask = all_vals == val array_ones = np.ones(mask.sum()) weight_vals = weights[mask] output[val] = np.dot(array_ones, weight_vals) return output
[docs]def from_pandas(X): """A simple wrapper for H2OFrame.from_python. This takes a pandas dataframe and returns an H2OFrame with all the default args (generally enough) plus named columns. Parameters ---------- X : pd.DataFrame The dataframe to convert. Returns ------- H2OFrame """ pd, _ = validate_is_pd(X, None) # older version of h2o are super funky with this if parse_version(h2o.__version__) < parse_version('3.10.0.7'): h = 1 else: h = 0 # if h2o hasn't started, we'll let this fail through return H2OFrame.from_python(X, header=h, column_names=X.columns.tolist())
[docs]def from_array(X, column_names=None): """A simple wrapper for H2OFrame.from_python. This takes a numpy array (or 2d array) and returns an H2OFrame with all the default args. Parameters ---------- X : ndarray The array to convert. column_names : list, tuple (default=None) the names to use for your columns Returns ------- H2OFrame """ X = check_array(X, force_all_finite=False) return from_pandas(pd.DataFrame.from_records(data=X, columns=column_names))
[docs]def h2o_corr_plot(X, plot_type='cor', cmap='Blues_d', n_levels=5, figsize=(11, 9), cmap_a=220, cmap_b=10, vmax=0.3, xticklabels=5, yticklabels=5, linewidths=0.5, cbar_kws={'shrink': 0.5}, use='complete.obs', na_warn=True, na_rm=False): """Create a simple correlation plot given a dataframe. Note that this requires all datatypes to be numeric and finite! Parameters ---------- X : H2OFrame, shape=(n_samples, n_features) The H2OFrame plot_type : str, optional (default='cor') The type of plot, one of ('cor', 'kde', 'pair') cmap : str, optional (default='Blues_d') The color to use for the kernel density estimate plot if plot_type == 'kde' n_levels : int, optional (default=5) The number of levels to use for the kde plot if plot_type == 'kde' figsize : tuple (int), optional (default=(11,9)) The size of the image cmap_a : int, optional (default=220) The colormap start point cmap_b : int, optional (default=10) The colormap end point vmax : float, optional (default=0.3) Arg for seaborn heatmap xticklabels : int, optional (default=5) The spacing for X ticks yticklabels : int, optional (default=5) The spacing for Y ticks linewidths : float, optional (default=0.5) The width of the lines cbar_kws : dict, optional Any KWs to pass to seaborn's heatmap when plot_type = 'cor' use : str, optional (default='complete.obs') The "use" to compute the correlation matrix na_warn : bool, optional (default=True) Whether to warn in the presence of NA values na_rm : bool, optional (default=False) Whether to remove NAs """ X = check_frame(X, copy=False) corr = None if plot_type == 'cor': use = _validate_use(X, use, na_warn) cols = [str(u) for u in X.columns] X = X.cor(use=use, na_rm=na_rm).as_data_frame(use_pandas=True) X.columns = cols # set the cols to the same names X.index = cols corr = 'precomputed' else: # WARNING! This pulls everything into memory... X = X.as_data_frame(use_pandas=True) corr_plot(X, plot_type=plot_type, cmap=cmap, n_levels=n_levels, figsize=figsize, cmap_a=cmap_a, cmap_b=cmap_b, vmax=vmax, xticklabels=xticklabels, corr=corr, yticklabels=yticklabels, linewidths=linewidths, cbar_kws=cbar_kws)
[docs]def h2o_frame_memory_estimate(X, bit_est=32, unit='MB'): """We estimate the memory footprint of an H2OFrame to determine, possibly, whether it's capable of being held in memory or not. Parameters ---------- X : H2OFrame The H2OFrame in question bit_est : int, optional (default=32) The estimated bit-size of each cell. The default assumes each cell is a signed 32-bit float unit : str, optional (default='MB') The units to report. One of ('MB', 'KB', 'GB', 'TB') Returns ------- mb : str The estimated number of UNIT held in the frame """ X = check_frame(X, copy=False) n_samples, n_features = X.shape n_bits = (n_samples * n_features) * bit_est n_bytes = n_bits // 8 return human_bytes(n_bytes, unit)
def _gen_optimized_chunks(idcs): """Given the list of indices, create more efficient chunks to minimize the number of rbind operations required for the H2OFrame ExprNode cache. """ idcs = sorted(idcs) counter = Counter(idcs) counts = counter.most_common() # order desc # the first index is the number of chunks we'll need to create. n_chunks = counts[0][1] chunks = [[] for _ in range(n_chunks)] # gen the number of chunks we'll need # 1. populate the chunks each with their first idx (the most common) # 2. pop from the counter # 3. re-generate the most_common(), repeat while counts: val, n_iter = counts[0] # the one at the head of the list is the most common for i in range(n_iter): chunks[i].append(val) counts.pop(0) # pop out the first idx... # sort them return [sorted(chunk) for chunk in chunks]
[docs]def reorder_h2o_frame(X, idcs, from_chunks=False): """Currently, H2O does not allow us to reorder frames. This is a hack to rbind rows together in the order prescribed. Parameters ---------- X : H2OFrame The H2OFrame to reorder idcs : iterable The order of the H2OFrame rows to be returned. from_chunks : bool, optional (default=False) Whether the elements in ``idcs`` are optimized chunks generated by ``_gen_optimized_chunks``. Returns ------- new_frame : H2OFrame The reordered H2OFrame """ # hack... slow but functional X = check_frame(X, copy=False) # we're rbinding. no need to copy # to prevent rbinding rows over, and over, and over # create chunks. Rbind chunks that are progressively increasing. # once we hit an index that decreases, rbind, and then start the next chunk last_index = np.inf chunks = [] # all of the chunks chunk = [] # the current chunk being built for i in idcs: # if it's a chunk from balancer: if from_chunks: # probably a list of indices chunks.append(X[i, :]) # otherwise chunks have not been computed else: # while the indices increase adjacently if i < last_index: last_index = i chunk.append(i) # otherwise, they are no longer increasing else: # if a chunk exists if chunk: # there should ALWAYS be a chunk rows = X[chunk, :] else: rows = X[i, :] # append the chunk and reset the list chunks.append(rows) chunk = [] last_index = np.inf # print([type(c) for c in chunks]) # couldn't figure out an issue for a while... return rbind_all(*chunks)
[docs]def shuffle_h2o_frame(X): """Currently, H2O does not allow us to shuffle frames. This is a hack to rbind rows together in the order prescribed. Parameters ---------- X : H2OFrame The H2OFrame to reorder Returns ------- shuf : H2OFrame The shuffled H2OFrame """ warnings.warn('Shuffling H2O frames will eventually be deprecated, as H2O ' 'does not allow re-ordering of frames by row. The current work-around ' '(rbinding the rows) is known to cause issues in the H2O ExprNode ' 'cache for very large frames.', DeprecationWarning) X = check_frame(X, copy=False) idcs = np.random.permutation(np.arange(X.shape[0])) shuf = reorder_h2o_frame(X, idcs) # do not generate optimized chunks here... return shuf