#!/usr/bin/env python
#
# processing_functions_core.py - Functions used by processing_functions.
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
"""This module contains utility algorithms and functions used by the
:mod:`processing_functions` module.
.. autosummary::
:nosignatures:
isSparse
naCorrelation
pairwiseRedundantColumns
matrixRedundantColumns
binariseCategorical
expandCompound
"""
import enum
import logging
import collections
import functools as ft
import multiprocessing as mp
import multiprocessing.dummy as mpd
from typing import Optional, Tuple, List, Union, Any
import numpy as np
import pandas as pd
import pandas.api.types as pdtypes
import funpack.util as util
log = logging.getLogger(__name__)
[docs]
def isSparse(
data : pd.Series,
ctype : Optional[enum.Enum] = None,
minpres : Optional[float] = None,
minstd : Optional[float] = None,
mincat : Optional[float] = None,
maxcat : Optional[float] = None,
abspres : bool = True,
abscat : bool = True,
naval : Optional[Any] = None
) -> Tuple[bool, Union[str, None], Any]:
"""Returns ``True`` if the given data looks sparse, ``False`` otherwise.
Used by :func:`.removeIfSparse` - see that function for details.
By default, the ``minstd`` test is only performed on numeric columns, and
the ``mincat``/``maxcat`` tests are only performed on integer/categorical
columns. This behaviour can be overridden by passing in a ``ctype`` of
``None``, in which case all speified tests will be performed.
:arg data: ``pandas.Series`` containing the data
:arg ctype: The series column type (one of the :attr:`.util.CTYPES`
values), or ``None`` to disable type-specific logic.
:returns: A tuple containing:
- ``True`` if the data is sparse, ``False`` otherwise.
- If the data is sparse, one of ``'minpres'``,
``'minstd'``, ``mincat``, or ``'maxcat'``, indicating
the cause of its sparsity. ``None`` if the data is not
sparse.
- If the data is sparse, the value of the criteria which
caused the data to fail the test. ``None`` if the data
is not sparse.
"""
if naval is None: presmask = data.notnull()
else: presmask = data != naval
present = data[presmask]
ntotal = len(data)
npresent = len(present)
def fixabs(val, isabs, limit=ntotal, repl=npresent):
# Turn proportion into
# an absolute count
if not isabs:
val = val * limit
if (val % 1) >= 0.5: val = np.ceil(val)
else: val = np.floor(val)
# ignore the threshold if it is
# bigger than the total data length
if limit < val: return repl
else: return val
iscategorical = ctype in (None,
util.CTYPES.integer,
util.CTYPES.categorical_single,
util.CTYPES.categorical_single_non_numeric,
util.CTYPES.categorical_multiple,
util.CTYPES.categorical_multiple_non_numeric)
isnumeric = ctype in (None,
util.CTYPES.continuous,
util.CTYPES.integer,
util.CTYPES.categorical_single,
util.CTYPES.categorical_multiple) and \
pdtypes.is_numeric_dtype(data)
# not enough values
if minpres is not None:
if npresent < fixabs(minpres, abspres):
return True, 'minpres', npresent
# stddev is not large enough (for
# numerical/categorical types)
if isnumeric and minstd is not None:
std = (present - present.mean()).std()
if std <= minstd:
return True, 'minstd', std
# for categorical types
if iscategorical and ((maxcat is not None) or (mincat is not None)):
if maxcat is not None:
maxcat = fixabs(maxcat, abscat, npresent, npresent + 1)
if mincat is not None:
mincat = fixabs(mincat, abscat, npresent)
# mincat - smallest category is too small
# maxcat - one category is too dominant
uniqvals = pd.unique(present)
uniqcounts = [sum(present == u) for u in uniqvals]
# Column is empty - force the test to fail
if len(uniqcounts) == 0:
if mincat is not None:
mincat = 1
nmincat = 0
if maxcat is not None:
maxcat = 0
nmaxcat = 0
else:
nmincat = min(uniqcounts)
nmaxcat = max(uniqcounts)
if mincat is not None:
if nmincat < mincat:
return True, 'mincat', nmincat
if maxcat is not None:
if nmaxcat >= maxcat:
return True, 'maxcat', nmaxcat
return False, None, None
[docs]
def naCorrelation(
namask : np.ndarray,
nathres : float,
nacounts : Optional[np.ndarray] = None
) -> np.ndarray:
"""Compares the missingness correlation of every pair of columns in
``namask``.
:arg namask: 2D ``numpy`` array of shape ``(nsamples, ncolumns)``
containing binary missingness classification
:arg nathres: Threshold above which column pairs should be classed
as correlated.
:arg nacounts: 1D ``numpy`` array containing the umber of missing values
in each column. Calculated if not provided.
:returns: A ``(ncolumns, ncolumns)`` boolean ``numpy`` array
containing ``True`` for column pairs which exceed
``nathres``, ``False`` otherwise.
"""
log.debug('Calculating missingness correlation '
'(%u columns)', namask.shape[1])
if nacounts is None:
nacounts = namask.sum(axis=0)
# must be float32 to take
# advantage of parallelism
namask = namask.astype(np.float32)
nacorr = np.corrcoef(namask.T)
nacorr = np.abs(nacorr) > nathres
# columns with no/all missing values
# will contain invalid correlation
# values. But we pass them because
# these results will be combined with
# a data correlation test.
flatmask = (nacounts == 0) | (nacounts == namask.shape[0])
nacorr[flatmask, :] = True
nacorr[:, flatmask] = True
return nacorr
[docs]
def pairwiseRedundantColumns(
data : pd.DataFrame,
colpairs : np.ndarray,
corrthres : float,
token : Optional[str] = None
) -> List[Tuple[int, int]]:
"""Identifies redundant columns based on their correlation with each
other by comparing each pair of columns one by one.
:arg data: ``pandas.DataFrame`` containing the data
:arg colpairs: ``numpy`` array of shape ``(N, 2)``, containing indices
of the column pairs to be evaluated.
:arg corrthres: Correlation threshold - columns with a correlation greater
than this are identified as redundant.
:arg token: Identifier string for log messages.
:returns: Sequence of tuples of column indices, where each tuple
``(a, b)`` indicates that column ``a`` is redundant with
respect to column ``b``.
"""
if len(colpairs) == 0:
return []
if token is None: token = ''
else: token = '[{}] '.format(token)
redundant = {}
nacounts = data.isna().sum(axis=0).to_numpy()
# calculate correlation between column pairs
for i, (coli, colj) in enumerate(colpairs):
if i % 1000 == 0:
log.debug('%sTesting column pair %u / %u',
token, i + 1, len(colpairs))
datai = data.iloc[:, coli]
dataj = data.iloc[:, colj]
corr = np.abs(datai.corr(dataj))
# i and j are highly correlated -
# flag the one with more missing
# values as redundant
if corr > corrthres:
if nacounts[coli] > nacounts[colj]: drop, keep = coli, colj
else: drop, keep = colj, coli
if drop not in redundant:
log.debug('%sColumn %s is redundant (correlation with %s: %f)',
token, data.columns[drop], data.columns[keep], corr)
redundant[drop] = keep
return list(redundant.items())
[docs]
def matrixRedundantColumns(
data : pd.DataFrame,
corrthres : float,
nathres : float = None,
precision : str = None) -> List[Tuple[int, int]]:
"""Identifies redundant columns based on their correlation with each
other using dot products to calculate a correlation matrix.
:arg data: ``pandas.DataFrame`` containing the data
:arg corrthres: Correlation threshold - columns with a correlation greater
than this are identified as redundant.
:arg nathres: Correlation threshold to use for missing values. If
provided, columns must have a correlation greater than
``corrthres`` *and* a missing-value correlation greater
than ``nathres`` to be identified as redundant.
:arg precision: ``'double'`` (the default) or ``'single'``, specifying
the floating point precision to use. Note that the
algorithm used to calculate the correlation values is
unstable for data with a range larger than ~10e5, so
double precision is used as default.
:returns: Sequence of tuples of column indices, where each tuple
``(a, b)`` indicates that column ``a`` is redundant with
respect to column ``b``.
"""
if len(data.columns) < 2:
return []
if precision == 'single':
dtype = np.float32
elif precision in (None, 'double'):
dtype = np.float64
else:
raise ValueError(f'Invalid precision: {precision}')
# Keep a copy of the column names for logging.
# Create a 2D matrix containing all data
columns = data.columns
data = data.to_numpy(dtype=dtype, copy=True)
namask = np.isnan(data)
nacounts = namask.sum(axis=0)
# missingness correlation
if nathres is None: nacorr = None
else: nacorr = naCorrelation(namask, nathres, nacounts)
log.debug('Calculating pairwise correlations '
'(matrix shape: %s)', data.shape)
# zero out nan elements
data[namask] = 0
# p=present elements
namask = (~namask).astype(dtype)
Ap = Bp = namask
A = B = data
# sum x_i y_i
xy = A.T @ B
# number of items defined both in x and y
n = Ap.T @ Bp
# mean values in x, calculated across items defined both in x and y
# mean values in y, calculated across items defined both in x and y
mx = (A.T @ Bp) / n
my = (Ap.T @ B) / n
# sum x^2_i, calculated across items defined both in x and y
# sum y^2_i, calculated across items defined both in x and y
x2 = (A * A).T @ Bp
y2 = Ap.T @ (B * B)
# sx, sy - standard deviations
# sx = sqrt(x2 - n .* (mx.^2));
# sy = sqrt(y2 - n .* (my.^2));
sx = x2 - n * (mx ** 2)
sx[sx < 0] = 0
sx = np.sqrt(sx)
sy = y2 - n * (my ** 2)
sy[sy < 0] = 0
sy = np.sqrt(sy)
# correlation coefficient
coef = (xy - n * mx * my) / (sx * sy)
# ignore nans/infs, binarise. Keep a copy
# of the raw correlations for logging.
rawcoef = coef
coef[~np.isfinite(coef)] = 0
coef = np.abs(coef) > corrthres
# columns must have a correlation greater than
# corrthres and a missing-value correlation greater
# than nathres to be identified as redundant.
if nacorr is not None:
coef = coef & nacorr
# generate indices of correlated column
# pairs; we only need to consider one half
# of the triangle
coef = np.triu(coef, k=1)
colpairs = np.vstack(np.where(coef))
# for each correlated pair, we flag the
# one with more missing values as redundant
def correlatedPairs(coli, colj):
if nacounts[coli] > nacounts[colj]: drop, keep = coli, colj
else: drop, keep = colj, coli
# coef is only the upper triangle
if coli <= colj: corr = rawcoef[coli, colj]
else: corr = rawcoef[colj, coli]
log.debug('Column %s is redundant (correlation with %s: %0.6f)',
columns[drop], columns[keep], corr)
return drop, keep
# Generate a sequence of pairs, where the first
# element is the column to drop, and the second
# element is the column it is redundant w.r.t.
correlatedPairs = np.vectorize(correlatedPairs, [np.uint32, np.uint32])
colsa, colsb = correlatedPairs(colpairs[0], colpairs[1])
if len(colsa) == 0:
return []
# Return only one pair for each column
# that is to be dropped (the first
# pair in the natural column ordering).
idxs = np.unique(colsa, return_index=True)[1]
colsa = colsa[idxs]
colsb = colsb[idxs]
return list(zip(colsa, colsb))
[docs]
def binariseCategorical(
data : pd.DataFrame,
minpres : Optional[int] = None,
take : Optional[pd.DataFrame] = None,
token : Optional[str] = None,
njobs : Optional[int] = None
) -> Tuple[np.ndarray, np.ndarray]:
"""Takes one or more columns containing categorical data,, and generates a
new set of binary columns (as ``np.uint8``), one for each unique
categorical value, containing ``1`` in rows where the value was present,
``0`` otherwise.
:arg data: A ``pandas.DataFrame`` containing the input columns
:arg minpres: Optional threshold - values with less than this number of
occurrences will not be included in the output
:arg take: Optional ``pandas.DataFrame`` containing values to use in
the output. Instead of using binary ``0``/``1`` values,
rows where a unique value is present will be populated with
the corresponding value from ``take``, and rows where a
unique value is not present will be populated with
``np.nan``. Must contain the same number of columns (and
rows) as ``data``.
:arg token: Unique token to identify this function call, to make it
re-entrant (in case multiple calls are made in a parallel
execution environment).
:arg njobs: Number of jobs to parallelise tasks with - the
:func:`generateBinaryColumns` function is called in parallel
for different blocks of unique values.
:returns: A tuple containing:
- A ``(nrows, nvalues)`` ``numpy`` array containing the
generated binary columns
- A 1D ``numpy`` array of length ``nvalues`` containing
the unique values that are encoded in the binary columns.
"""
if njobs is None: njobs = 1
if njobs < 1: njobs = 1
if njobs == 1: Pool = mpd.Pool
else: Pool = mp.Pool
if (take is not None) and take.shape != data.shape:
takes = take.shape
datas = data.shape
takec = take.columns.difference(data.columns)
datac = data.columns.difference(take.columns)
raise ValueError('take {} must have same shape as data {}. '
'Column difference: {} -- {}'.format(
takes, datas, datac, takec))
if minpres is None:
minpres = 0
if token is None: token = ''
else: token = '[{}] '.format(token)
log.debug('%sCounting unique values across %u '
'columns...', token, len(data.columns))
cols = [data[c] for c in data.columns]
cols = [c[c.notna()] for c in cols]
coluniq = [np.unique(c, return_counts=True) for c in cols]
uniq = collections.defaultdict(lambda : 0)
for coluniq, colcounts in coluniq:
for val, count in zip(coluniq, colcounts):
uniq[val] += count
# drop values with low occurrence
uniq = [v for v, c in uniq.items() if c >= minpres]
uniq = list(sorted(uniq))
if len(uniq) == 0:
return np.zeros((len(data), 0)), np.zeros((0,))
log.debug('%sCounted %u unique values [minpres: %u]',
token, len(uniq), minpres)
# Prepare inputs for parallelising the
# binarise using binariseUniqueValues
data = data.to_numpy()
# Figure out the type and fill value
# of the output array. if take is
# provided, we assume that every
# column in it has the same dtype
if take is None:
dtype = np.uint8
fill = 0
else:
dtype = take.dtypes.iloc[0]
take = take.to_numpy()
if np.issubdtype(dtype, np.integer): fill = 0
elif np.issubdtype(dtype, np.datetime64): fill = np.datetime64('NAT')
else: fill = np.nan
# We parallelise binarisation across
# blocks of unique values - each call
# to generateBinaryColumns will binarise
# one block. Only read access is needed
# for the uniq, data, and take arrays,
# so they are made available at the
# module level (and thus accessible to
# the worker processes in shared parent
# process memory).
#
# Write access is needed for the bindata
# array (where the result is written),
# so it is shared as a mp.RawArray.
#
# If take is None, bindata is a uint8
# array; otherwise it is a take.dtype
# array. If take has a dtype of
# np.datetime64, we need a hack, as we
# can't create a sharedmem array of that
# type (it is not supported by
# numpy.ctypeslib.as_ctypes_type). So
# for datetime, we make bindata uint64,
# and then cast it back to datetime64
# afterwards.
if np.issubdtype(dtype, np.datetime64):
cdtype = np.uint64
else:
cdtype = dtype
# Shared array to store binarised
# columns. If not parallelising,
# we use a regular numpy array
binshape = (len(data), len(uniq))
if njobs > 1:
rawbindata = mp.RawArray(np.ctypeslib.as_ctypes_type(cdtype),
int(np.prod(binshape)))
bindata = np.ctypeslib.as_array(rawbindata).reshape(binshape)
else:
bindata = np.empty(binshape, dtype=dtype)
# make the inputs module-level accessible
# before creating worker processess. We
# use a unique token for this invocation
# in case this function has been called
# multiple times
if not hasattr(generateBinaryColumns, 'inputs'):
generateBinaryColumns.inputs = {}
generateBinaryColumns.inputs[token] = (uniq, data, bindata, take)
# create an offset into the uniq
# list for each worker process
valsperjob = int(np.ceil(len(uniq) / njobs))
offsets = np.arange(0, len(uniq), valsperjob)
func = ft.partial(generateBinaryColumns,
nvals=valsperjob,
fill=fill,
token=token)
try:
with Pool(njobs) as pool:
pool.map(func, offsets)
pool.close()
pool.join()
finally:
generateBinaryColumns.inputs.pop(token)
# cast the result if necessary
if dtype != cdtype:
bindata = bindata.astype(dtype)
return bindata, uniq
[docs]
def generateBinaryColumns(offset, nvals, fill, token):
"""Called by :func:`binariseCategorical`. Generates binarised columns for
a subset of unique values for a data set.
The following arguments are passed internally from
:func:`binariseCategorical` to this function:
- Sequence of unique values
- Numpy array containing the data
- Numpy array to store the output
- Numpy array to take values from
:arg offset: Offset into the sequence of unique values
:arg nvals: Number of unique values to binarise
:arg fill: Default value
:arg token: Unique token used to retrieve the data for one invocation of
this function.
"""
# get refs to the shared inputs,
uniq, data, bindata, take = generateBinaryColumns.inputs[token]
# are all values being binarised in one go?
allvals = (offset == 0) and (len(uniq) == nvals)
# don't overflow on the last
# block of unique values
if offset + nvals > len(uniq):
nvals = len(uniq) - offset
# rather than writing to bindata directly
# (which could be a mem-mapped file,
# therefore slow), we write to a smaller
# in-memory array and then copy afterwards
# (but not if this function call is
# binarising all unique values)
if allvals: binblock = bindata
else: binblock = np.empty((len(data), nvals), bindata.dtype)
binblock[:] = fill
for i in range(nvals):
v = uniq[i + offset]
if (i + offset) % 100 == 0:
log.debug('%sUnique value %u / %u [%s] ...',
token, i + offset + 1, len(uniq), v)
mask = data == v
# if take is None, each column is 1
# where the subject had an entry
# equal to that value, 0 otherwise.
if take is None:
binblock[:, i] = mask.any(axis=1)
# if take is provided, each column
# contains the value from take where
# the subject had an entry equal to
# that value, or nan otherwise.
# If a subject had multiple entries
# equal to value, the first
# corresponding entry from take is
# used (via this first function)
else:
rowmask = mask.any(axis=1)
idxs = np.argmax(mask, axis=1)
values = take[np.arange(take.shape[0]), idxs]
binblock[rowmask, i] = values[rowmask]
# copy results to the output array
if not allvals:
bindata[:, offset:offset + nvals] = binblock
[docs]
def expandCompound(data : pd.Series) -> np.ndarray:
"""Takes a ``pandas.Series`` containing sequence data (potentially of
variable length), and returns a 2D ``numpy`` array containing the parsed
data.
The returned array has shape ``(X, Y)``, where ``X`` is the number of rows
in the data, and ``Y`` is the maximum number of values for any of the
entries.
:arg data: ``pandas.Series`` containing the compound data.
:returns: 2D ``numpy`` array containing expanded data.
"""
nrows = len(data)
lens = data.apply(len).values
ncols = max(lens)
# Create a 2D array from
# rows of different lengths
#
# https://stackoverflow.com/a/32043366
#
# 2D mask array of shape (nrows, max(lens))
# which identifies positions to be filled
mask = np.arange(ncols) < np.atleast_2d(lens).T
newdata = np.full((nrows, ncols), np.nan, dtype=np.float32)
newdata[mask] = np.concatenate(data.values)
return newdata