Source code for funpack.importing.core

#!/usr/bin/env python
#
# core.py - The data import stage
#
# Author: Paul McCarthy <pauldmccarthy@gmail.com>
#
"""This module provides the :func:`importData` function, which implements
the data importing stage of the ``funpack`` sequence
"""


import itertools             as it
import functools             as ft
import multiprocessing       as mp
import multiprocessing.dummy as mpd
import                          logging
import                          warnings
import                          collections

import pandas                as pd
import numpy                 as np
import funpack.util          as util
import funpack.custom        as custom
import funpack.merging       as merging
import funpack.datatable     as datatable
import funpack.loadtables    as loadtables
from . import                   filter   # noqa
from . import                   reindex


log = logging.getLogger(__name__)


NUM_ROWS = 10000
"""Default number of rows read at a time by :func:`loadData` - it reads the
data in chunks.
"""


MERGE_AXIS = 'variables'
"""Default merge axis when loading multiple data files - see :func:`mergeData`.
"""


MERGE_STRATEGY = 'intersection'
"""Default merge strategy when loading multiple data files - see
:func:`mergeData`.
"""


MERGE_AXIS_OPTIONS = ['0', 'rows', 'subjects',
                      '1', 'cols', 'columns', 'variables']
"""Values accepted for the ``axis`` option to the :func:`mergeData` function.
"""


MERGE_STRATEGY_OPTIONS = ['naive', 'union', 'intersection', 'inner', 'outer']
"""Values accepted for the ``strategy`` option to the :func:`mergeData`
function.
"""


[docs] class VariableMissing(ValueError): """Exception raised by :func:`importData` when ``--fail_if_missing`` / ``failIfMissing`` is active, and a requested variable is not present in the input files. """
[docs] def importData(fileinfo, vartable, proctable, cattable, variables=None, colnames=None, excludeColnames=None, categories=None, subjects=None, subjectExprs=None, exclude=None, excludeVariables=None, excludeCategories=None, trustTypes=False, mergeAxis=None, mergeStrategy=None, indexVisits=False, dropNaRows=False, addAuxVars=False, njobs=1, mgr=None, dryrun=False, failIfMissing=False): """The data import stage. This function does the following: 1. Figures out which columns to load (using the :func:`.columnsToLoad` function). 2. Loads the data (using :func:`loadFiles`), 3. Creates and returns a :class:`DataTable`. :arg fileinfo: :class:`.FileInfo` object describing the input file(s). :arg vartable: The data coding table :arg proctable: The processing table :arg cattable: The category table :arg variables: List of variable IDs to import :arg colnames: List of names/glob-style wildcard patterns specifying columns to import. :arg excludeColnames: List of column name suffixes specifying columns to exclude. :arg categories: List of category names/IDs to import :arg subjects: List of subjects to include :arg subjectExprs: List of subject inclusion expressions :arg exclude: List of subjects to exclude :arg excludeVariables: List of variables to exclude :arg excludeCategories: List of category names/IDs to exclude :arg trustTypes: If ``True``, it is assumed that columns with a known data type do not contain any bad/unparseable values. This improves performance, but will cause an error if the assumption does not hold. :arg mergeAxis: Merging axis to use when loading multiple data files - see the :func:`mergeData` function. :arg mergeStrategy: Merging strategy to use when loading multiple data files - see the :func:`mergeData` function. :arg indexVisits: Re-arrange the data so that rows are indexed by subject ID and visit, rather than visits being split into separate columns. Only applied to variables which are labelled with Instancing 2. :arg dropNaRows: If ``True``, rows which do not contain data for any columns are not loaded. :arg addAuxVars: If ``True``, data fields which are referred to in the processing rules are selected for import if present in the input file(s) and not already selected, and . See the ``take`` argument to :func:`.processing_functions.binariseCategorical` for an example. :arg njobs: Number of processes to use for parallelising tasks. :arg mgr: :class:`multiprocessing.Manager` object for parallelisation :arg dryrun: If ``True`` the data is not loaded. :arg failIfMissing: Defaults to ``False``. If ``True``, and a requested variable is not present in the input files, a ``VariableMissing`` error is raised. :returns: A tuple containing: - A :class:`DataTable`, which contains references to the data, and the variable and procesing tables. - A list of :class:`.Column` objects that were not loaded from each input file. """ # generate a list of variable IDs to include/exclude, # from variable/category selection/deselection options variables, excludevars = filter.restrictVariables( cattable, variables, categories, excludeVariables, excludeCategories) # make sure auxillary variables are to be imported # (e.g. binariseCategorical(take)) if addAuxVars: variables, excludevars = filter.addAuxillaryVariables( fileinfo, proctable, variables, excludevars) # Warn/error if any requested variables are not present if variables is not None: for vid in variables: if vid not in fileinfo.allVariables: msg = f'Requested data field {vid} is ' \ 'not present in input file(s).' if failIfMissing: raise VariableMissing(msg) else: log.warning(msg) # Figure out which columns to load cols, drop = filter.columnsToLoad(fileinfo, vartable, variables, excludevars, colnames, excludeColnames) # Load those columns, merging # multiple input files. if njobs > 1: Pool = mp.Pool else: Pool = mpd.Pool with Pool(njobs) as pool: data, cols = loadFiles(fileinfo, vartable, cols, subjects=subjects, subjectExprs=subjectExprs, exclude=exclude, mergeAxis=mergeAxis, mergeStrategy=mergeStrategy, indexVisits=indexVisits, dropNaRows=dropNaRows, trustTypes=trustTypes, pool=pool, dryrun=dryrun) pool.close() pool.join() # Re-order the columns according to # specified variables, if provided if variables is not None: # Build a list of all loaded vids - # this will include those loaded # via the colnames argument allvars = variables for c in cols: if c.vid == 0: continue if c.vid not in allvars: allvars.insert(0, c.vid) # organise columns by vid # (skipping the index column) newcols = collections.defaultdict(list) for c in cols: if c.vid == 0: continue newcols[c.vid].append(c) # order them by the variable list # (including the ID column(s) for # the first file) cols = list(it.chain([cols[0]], *[newcols[v] for v in allvars])) if not dryrun: data = data[[c.name for c in cols[1:]]] dtable = datatable.DataTable( data, cols, vartable, proctable, cattable, njobs, mgr) return dtable, drop
[docs] def loadFiles(fileinfo, vartable, columns, nrows=None, subjects=None, subjectExprs=None, exclude=None, trustTypes=False, mergeAxis=None, mergeStrategy=None, indexVisits=False, dropNaRows=False, pool=None, dryrun=False): """Load data from ``datafiles``, using :func:`.mergeDataFrames` if multiple files are provided. :arg fileinfo: :class:`.FileInfo` object describing the input file(s). :arg vartable: Variable table :arg columns: Dict of ``{ file : [Column] }`` mappings, defining the columns to load, as returned by :func:`columnsToLoad`. :arg nrows: Number of rows to read at a time. Defaults to :attr:`NUM_ROWS`. :arg subjects: List of subjects to include. :arg subjectExprs: List of subject inclusion expressions :arg exclude: List of subjects to exclude :arg trustTypes: Assume that columns with known data type do not contain any bad/unparseable values. :arg mergeAxis: Merging axis to use when loading multiple data files - see the :func:`mergeDataFrames` function. Defaults to :attr:`MERGE_AXIS`. :arg mergeStrategy: Strategy for merging multiple data files - see the :func:`mergeData` function. Defaults to :attr:`MERGE_STRATEGY`. :arg indexVisits: Re-arrange the data so that rows are indexed by subject ID and visit, rather than visits being split into separate columns. Only applied to variables which are labelled with Instancing 2. :arg dropNaRows: If ``True``, rows which do not contain data for any columns are not loaded. :arg pool: ``multiprocessing.Pool`` object for running tasks in parallel. :arg dryrun: If ``True``, the data is not loaded. :returns: A tuple containing: - A ``pandas.DataFrame`` containing the data, or ``None`` if ``dryrun is True``. - A list of :class:`.Column` objects representing the columns that were loaded. """ if mergeStrategy is None: mergeStrategy = MERGE_STRATEGY if mergeAxis is None: mergeAxis = MERGE_AXIS # load the data data = [] loadedcols = [] for fname in fileinfo.datafiles: toload = columns[ fname] loader = fileinfo.loader(fname) fdata = None fcols = None if loader is not None: log.debug('Loading %s with custom loader %s', fname, loader) fdata = custom.runLoader(loader, fname) fcols = toload else: log.debug('Loading %s with pandas', fname) fdata, fcols = loadFile(fname, fileinfo, vartable, toload, nrows=nrows, subjects=subjects, subjectExprs=subjectExprs, exclude=exclude, indexVisits=indexVisits, dropNaRows=dropNaRows, trustTypes=trustTypes, pool=pool) data .append(fdata) loadedcols.append(fcols) # Merge data from multiple files # into a single dataframe data, cols = merging.mergeDataFrames( data, loadedcols, mergeAxis, mergeStrategy, dryrun) # if a subject list was provided, # re-order the data according to # that list if (not dryrun) and subjects is not None: # if exclude/subjectExpr lists were # provided, and they overlap with # the subjects list, there will be # more IDs in the subject list than # the dataframe. Fix it. if len(data.index) != len(subjects): if data.index.nlevels == 1: oldsubjects = data.index else: oldsubjects = data.index.levels[0] subjects = pd.Index(subjects, name=data.index.names[0]) subjects = subjects.intersection(oldsubjects, sort=False) if data.index.nlevels == 1: idx = subjects else: idx = (subjects, slice(None)) data = data.loc[idx, :] return data, cols
[docs] def loadFile(fname, fileinfo, vartable, toload, nrows=None, subjects=None, subjectExprs=None, exclude=None, indexVisits=False, dropNaRows=False, trustTypes=False, pool=None): """Loads data from the specified file. The file is loaded in chunks of ``nrows`` rows, using the :func:`loadChunk` file. :arg fname: Path to the data file :arg fileinfo: :class:`.FileInfo` object describing the input file. :arg vartable: Variable table :arg toload: Sequence of :class:`.Column` objects describing the columns that should be loaded, as generated by :func:`columnsToLoad`. :arg nrows: Number of rows to read at a time. Defaults to attr:`NUM_ROWS`. :arg subjects: List of subjects to include. :arg subjectExprs: List of subject inclusion expressions :arg exclude: List of subjects to exclude :arg indexVisits: Re-arrange the data so that rows are indexed by subject ID and visit, rather than visits being split into separate columns. Only applied to variables which are labelled with Instancing 2. :arg dropNaRows: If ``True``, rows which do not contain data for any columns are not loaded. :arg trustTypes: Assume that columns with known data type do not contain any bad/unparseable values. :arg pool: ``multiprocessing.Pool`` object for running tasks in parallel. :returns: A tuple containing: - A ``pandas.DataFrame`` containing the data. - A list of :class:`.Column` objects representing the columns that were loaded. Note that this may be different from ``toload``, if ``indexByVisit`` was set. """ ownPool = pool is None toload = list(toload) if nrows is None: nrows = NUM_ROWS if pool is None: pool = mpd.Pool(1) # The read_csv function requires the # index argument to be specified # relative to the usecols argument: # # - https://stackoverflow.com/a/45943627 # - https://github.com/pandas-dev/pandas/issues/9098 # - https://github.com/pandas-dev/pandas/issues/2654 # # So here we make index relative to # toload. index = fileinfo.index(fname) index = [i for i, c in enumerate(toload) if c.index in index] # Figure out suitable data types to # store the data for each column. # pd.read_csv wants the date columns # to be specified separately. vttypes, dtypes = loadtables.columnTypes(vartable, toload) datecols = [c.name for c, t in zip(toload, vttypes) if t in (util.CTYPES.date, util.CTYPES.time)] # If we think there might be bad data # in the input (trustTypes is False), # only the types for date/time/non-numeric # columns are specified during load, and # we manually perform numeric conversion # after load, via the coerceToNumeric # function. This is to avoid pandas.read_csv # crashing on bad data - instead, we set bad # data to nan. if not trustTypes: dtypes = {n : t for n, t in dtypes.items() if not np.issubdtype(t, np.number)} # input may or may not # have a header row if fileinfo.header(fname): header = 0 else: header = None if indexVisits: indexVisits = reindex.genReindexedColumns(toload, vartable) else: indexVisits = None log.debug('Loading %u columns from %s: %s ...', len(toload), fname, [c.name for c in toload[:5]]) # Prepare arguments to # the loadChunk function allcols = fileinfo.columns( fname) dialect = fileinfo.dialect( fname) encoding = fileinfo.encoding(fname) args = {'fname' : fname, 'vartable' : vartable, 'header' : header, 'allcols' : allcols, 'toload' : toload, 'index' : index, 'nrows' : nrows, 'subjects' : subjects, 'subjectExprs' : subjectExprs, 'exclude' : exclude, 'indexByVisit' : indexVisits, 'dropNaRows' : dropNaRows, 'encoding' : encoding, 'trustTypes' : trustTypes, 'dtypes' : dtypes, 'datecols' : datecols} if dialect == 'whitespace': dlargs = {'sep' : r'\s+'} else: dlargs = {'delimiter' : dialect.delimiter, 'doublequote' : dialect.doublequote, 'escapechar' : dialect.escapechar, 'skipinitialspace' : dialect.skipinitialspace, 'quotechar' : dialect.quotechar, 'quoting' : dialect.quoting} args['dlargs'] = dlargs # Load chunks of rows separately, # so we can parallelise. We do this # by passing different offsets to # the loadChunk function. totalrows = util.wc(fname) offsets = list(range(0, totalrows, nrows)) offsets = [[i, o] for (i, o) in zip(range(len(offsets)), offsets)] # just for the log message if header: totalrows -= 1 log.debug('Loading %u rows in %u chunks', totalrows, len(offsets)) func = ft.partial(loadChunk, **args) chunks = pool.starmap(func, offsets) chunks = [c for c in chunks if len(c) > 0] if len(chunks) > 0: fdata = pd.concat(chunks, axis=0) else: fdata = pd.DataFrame() if indexVisits is not None: fcols = indexVisits[0] else: fcols = toload if ownPool: pool.close() pool.join() log.debug('Loaded %i rows from %s', len(fdata), fname) return fdata, fcols
[docs] def loadChunk(i, offset, fname, vartable, header, allcols, toload, index, nrows, subjects, subjectExprs, exclude, indexByVisit, dropNaRows, encoding, trustTypes, dtypes, datecols, dlargs): """Loads a chunk of ``nrows`` from ``fname``, starting at ``offset``. :arg i: Chunk number, just used for logging. :arg offset: Row number to start reading from. :arg fname: Path to the data file :arg vartable: Variable table :arg header: ``True`` if the file has a header row, ``False`` otherwise. :arg allcols: Sequence of :class:`.Column` objects describing all columns in the file. :arg toload: Sequence of :class:`.Column` objects describing the columns that should be loaded, as generated by :func:`columnsToLoad`. :arg index: List containing position(s) of index column(s) (starting from 0). :arg nrows: Number of rows to read at a time. Defaults to attr:`NUM_ROWS`. :arg subjects: List of subjects to include. :arg subjectExprs: List of subject inclusion expressions :arg exclude: List of subjects to exclude :arg indexByVisit: ``None``, or the return value of :func:`generateReindexedColumns`, which will be used to re-arrange the loaded data so it is indexed by visit number, in addition to row ID. :arg dropNaRows: If ``True``, rows which do not contain data for any columns are not loaded. :arg encoding: Character encoding (or sequence of encodings, one for each data file). Defaults to ``latin1``. :arg trustTypes: Assume that columns with known data type do not contain any bad/unparseable values. :arg dtypes: A dict of ``{ column_name : dtype }`` mappings containing a suitable internal data type to use for some columns. :arg datecols: List of column names denoting columns which should be interpreted as dates/times. :arg dlargs: Dict of arguments to pass through to ``pandas.read_csv``. :returns: ``pandas.DataFrame`` """ if encoding is None: encoding = 'latin1' allcolnames = [c.name for c in allcols] toloadnames = [c.name for c in toload] def shouldLoad(c): return c in toloadnames with warnings.catch_warnings(): warnings.filterwarnings('ignore', module='pandas.io.parsers') warnings.filterwarnings('ignore', category=pd.errors.DtypeWarning) df = pd.read_csv(fname, header=header, names=allcolnames, index_col=index, dtype=dtypes, usecols=shouldLoad, parse_dates=datecols, skiprows=offset, nrows=nrows, encoding=encoding, **dlargs) gotrows = len(df) # drop NA rows if requested if dropNaRows: df.dropna(how='all', inplace=True) # If a subject/expression/exclude list # is provided, filter the rows accordingly df = filter.filterSubjects(df, toload, subjects, subjectExprs, exclude) log.debug('Processing chunk %i (kept %i / %i rows)', i + 1, len(df), gotrows) # re-arrange the data so that visits # form part of the index, rather than # being stored in separate columns for # each variable cols = toload if indexByVisit is not None: df = reindex.reindexByVisit(df, cols, *indexByVisit) cols = indexByVisit[0] # If not trustTypes, we manually convert # each column to its correct type. # # We have to do this after load, as # pd.read_csv will raise an error if # a column that is specified as # numeric contains non-numeric data. # So we coerce data types after the # data has been loaded. This causes # non-numeric data to be set to nan. if not trustTypes: for c in cols: if c.vid == 0: continue df[c.name] = coerceToNumeric(vartable, df[c.name], c) return df
[docs] def coerceToNumeric(vartable, series, column): """Coerces the given column to numeric, if necessary. :arg vartable: The variable table :arg series: ``pandas.Series`` containing the data to be coerced. :arg column: :class:`.Column` object representing the column to coerce. :returns: Coerced ``pandas.Series`` """ name = column.name dtype = loadtables.columnTypes(vartable, [column])[1] has_dtype = series.dtype exp_dtype = dtype.get(name, None) if (exp_dtype is not None) and \ np.issubdtype(exp_dtype, np.number) and \ (has_dtype != exp_dtype): # We can't force a specific numpy # dtype *and* coerce bad values to # nan in one step. So we do it in # two steps: to_numeric handles # coercion to NaN, and astype casts # to the exact type. s = pd.to_numeric(series, errors='coerce') s = s.astype(exp_dtype, copy=False) return s return series