import sys
import pandas as pd
from .util import stderr
[docs]
def read_tabular_data(X_paths, Y_paths, series_ids, categorical_columns=None, sep=' ', verbose=True):
"""
Read impulse and response data into pandas dataframes and perform basic pre-processing.
:param X_paths: ``str`` or ``list`` of ``str``; path(s) to impulse (predictor) data (multiple tables are concatenated). Each path may also be a ``;``-delimited list of paths to files containing predictors with different timestamps, where the predictors in each file are all timestamped with respect to the same reference point.
:param Y_paths: ``str`` or ``list`` of ``str``; path(s) to response data (multiple tables are concatenated). Each path may also be a ``;``-delimited list of paths to files containing different response variables with different timestamps, where the response variables in each file are all timestamped with respect to the same reference point.
:param series_ids: ``list`` of ``str``; column names whose jointly unique values define unique time series.
:param categorical_columns: ``list`` of ``str``; column names that should be treated as categorical.
:param sep: ``str``; string representation of field delimiter in input data.
:param verbose: ``bool``; whether to log progress to stderr.
:return: 2-tuple of list(``pandas`` DataFrame); (impulse data, response data). X and Y each have one element for each dataset in X_paths/Y_paths, each containing the column-wise concatenation of all column files in the path.
"""
if not isinstance(X_paths, list):
X_paths = [X_paths]
if not isinstance(Y_paths, list):
Y_paths = [Y_paths]
if verbose:
stderr('Loading data...\n')
X = []
Y = []
for path in X_paths:
assert path is not None, 'No data path provided. Exiting.'
_X = []
for x in path.split(';'):
_X.append(pd.read_csv(x, sep=sep, skipinitialspace=True))
X.append(_X)
for path in Y_paths:
assert path is not None, 'No data path provided. Exiting.'
_Y = []
for y in path.split(';'):
_Y.append(pd.read_csv(y, sep=sep, skipinitialspace=True))
Y.append(_Y)
# Regroup by column
# Stimuli
X_new = []
# Loop through datasets
for i in range(len(X)):
for j in range(len(X[i])):
while j >= len(X_new):
X_new.append([])
X_new[j].append(X[i][j])
X = []
# Loop through column files
for x in X_new:
X.append(pd.concat(x, axis=0))
# Responses
Y_new = []
# Loop through datasets
for i in range(len(Y)):
for j in range(len(Y[i])):
while j >= len(Y_new):
Y_new.append([])
Y_new[j].append(Y[i][j])
Y = []
# Loop through column files
for x in Y_new:
Y.append(pd.concat(x, axis=0))
# Sort
if verbose:
stderr('Ensuring sort order...\n')
for i, x in enumerate(X):
X[i] = x.sort_values(series_ids + ['time']).reset_index(drop=True)
for i, y in enumerate(Y):
Y[i] = y.sort_values(series_ids + ['time']).reset_index(drop=True)
# Process categorical
if categorical_columns is not None:
for t in categorical_columns:
split = t.split(':')
for col in split:
for _X in X:
if col in _X:
_X[col] = _X[col].astype('category')
for _Y in Y:
if col in _Y:
_Y[col] = _Y[col].astype('category')
if len(split) > 1:
for _Y in Y:
new_col = None
for col in split:
assert col in _Y, 'Members of categorical interaction grouping indices must all be present in every response table.'
if new_col is None:
new_col = _Y[col].astype(str)
else:
new_col = new_col + ':' + _Y[col].astype(str)
_Y[t] = new_col
# Add columns to X
for _X in X:
assert not 'rate' in _X, '"rate" is a reserved column name in CDR. Rename your input column...'
_X['rate'] = 1.
if 'trial' not in _X:
if series_ids:
_X['trial'] = _X.groupby(series_ids).rate.cumsum()
else:
_X['trial'] = _X.rate.cumsum()
return X, Y