from .regressors import Event, Confound, Intercept
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn import linear_model
import scipy as sp
from .plotting import plot_timecourses, plot_design_matrix
from nilearn import input_data, image
from nilearn._utils import load_niimg
from .utils import get_time_to_peak_from_timecourse
[docs]class ResponseFitter(object):
"""ResponseFitter takes an input signal and performs deconvolution on it.
To do this, it requires event times, and possible covariates.
ResponseFitter can, for each event type, use different basis function sets,
see Event."""
def __init__(self,
input_signal,
sample_rate,
oversample_design_matrix=20,
add_intercept=True, **kwargs):
""" Initialize a ResponseFitter object.
Parameters
----------
input_signal : numpy array, dimensions preferably (X, n)
input data, of X timeseries of n timepoints
sampled at the frequency at which we would
like to conduct this analysis
sample_rate : float
frequency in Hz at which input data are sampled
**kwargs : dict
keyward arguments to be internalized by the ResponseFitter object
"""
super(ResponseFitter, self).__init__()
self.__dict__.update(kwargs)
self.sample_rate = sample_rate
self.sample_duration = 1.0/self.sample_rate
self.oversample_design_matrix = oversample_design_matrix
self.input_signal_time_points = np.linspace(0,
input_signal.shape[0] * self.sample_duration,
input_signal.shape[0],
endpoint=False)
self.input_signal = pd.DataFrame(input_signal)
self.input_signal.index = pd.Index(self.input_signal_time_points,
name='time')
self.X = pd.DataFrame(index=self.input_signal.index)
if add_intercept:
self.add_intercept()
self.events = {}
def add_intercept(self, name='intercept'):
intercept = Intercept(name, self)
self._add_regressor(intercept)
[docs] def add_confounds(self, name, confound):
"""Add a timeseries or set of timeseries to the general
design matrix as a confound
Parameters
----------
confound : array
Confound of (n_timepoints) or (n_timepoints, n_confounds)
"""
confound = Confound(name, self, confound)
self._add_regressor(confound)
def _add_regressor(self, regressor, oversample=None):
if oversample is None:
oversample = self.oversample_design_matrix
regressor.create_design_matrix(oversample=oversample)
if self.X.shape[1] == 0:
self.X = pd.concat((regressor.X, self.X), 1)
else:
self.X = pd.concat((self.X, regressor.X), 1)
[docs] def add_event(self,
event_name,
onsets=None,
basis_set='fir',
interval=[0,10],
n_regressors=None,
durations=None,
covariates=None,
**kwargs):
"""
create design matrix for a given event_type.
Parameters
----------
event_name : string
Name of the event_type, used as key to lookup this event_type's
characteristics
**kwargs : dict
keyward arguments to be internalized by the generated and
internalized Event object. Needs to consist of the
necessary arguments to create an Event object,
see Event constructor method.
"""
assert event_name not in self.X.columns.get_level_values(0), "The event_name %s is already in use" % event_name
ev = Event(name=event_name,
onsets=onsets,
basis_set=basis_set,
interval=interval,
n_regressors=n_regressors,
durations=durations,
covariates=covariates,
fitter=self,
**kwargs)
self._add_regressor(ev)
self.events[event_name] = ev
[docs] def fit(self, type='ols', cv=20, alphas=None, store_residuals=False):
"""Regress a created design matrix on the input_data.
Creates internal variables betas, residuals, rank and s.
The beta values are then injected into the event_type objects the
ResponseFitter contains.
Parameters
----------
type : string, optional
the type of fit to be done. Options are 'ols' for np.linalg.lstsq,
'ridge' for CV ridge regression.
"""
if type == 'ols':
n, p = self.X.shape
self.betas, self.ssquares, self.rank, self.s = \
np.linalg.lstsq(self.X, self.input_signal, rcond=-1)
self._send_betas_to_regressors()
if self.rank < p:
raise Exception('Design matrix is singular. Consider using less '
'regressors, basis functions, or try ridge regression.')
self.sigma2 = self.ssquares / (n -(p+1))
self.sigma2 = pd.Series(self.sigma2, index=self.input_signal.columns)
if store_residuals:
prediction = self.X.dot(self.betas)
self._residuals = self.input_signal - prediction
elif type == 'ridge': # betas and residuals are internalized by ridge_regress
if self.input_signal.shape[1] > 1:
raise NotImplementedError('No support for multidimensional signals yet')
self.ridge_regress(cv=cv, alphas=alphas, store_residuals=store_residuals)
def get_standard_errors_timecourse(self, melt=False, oversample=None):
self._check_fitted()
c = self.get_basis_functions(oversample=oversample)
X_= np.linalg.pinv(self.X.T.dot(self.X))
sem = np.sqrt((c.values.dot(X_) * c).sum(1).values[:, np.newaxis] * self.sigma2[np.newaxis, :])
sem = pd.DataFrame(sem, index=c.index, columns=self.sigma2.index)
if melt:
sem = sem.reset_index().melt(id_vars=['event type',
'covariate',
'time'],
var_name='roi')
return sem
def get_t_value_timecourses(self,
oversample=None,
melt=False):
tc = self.get_timecourses(oversample=oversample,
melt=melt)
sem = self.get_standard_errors_timecourse(melt=melt,
oversample=oversample)
t = tc / sem
t = pd.concat([t], keys=['t'], names=['stat'], axis=1)
return t
[docs] def ridge_regress(self, cv=20, alphas=None, store_residuals=False):
"""
run CV ridge regression instead of ols fit. Uses sklearn's RidgeCV class
Parameters
----------
cv : int
number of cross-validation folds
alphas : np.array
the alpha/lambda values to try out in the CV ridge regression
"""
if alphas is None:
alphas = np.logspace(7, 0, 20)
self.rcv = linear_model.RidgeCV(alphas=alphas,
fit_intercept=False,
cv=cv)
self.rcv.fit(self.X, self.input_signal)
self.betas = self.rcv.coef_.T
if store_residuals:
self._residuals = self.input_signal - self.rcv.predict(self.X)
self.ssquares = np.sum(self._residuals**2)
else:
self.ssquares = np.sum((self.input_signal - self.rcv.predict(self.X))**2)
self._send_betas_to_regressors()
def _send_betas_to_regressors(self):
self.betas = pd.DataFrame(self.betas,
index=self.X.columns,
columns=self.input_signal.columns)
for key in self.events:
self.events[key].betas = self.betas.loc[[key]]
[docs] def predict_from_design_matrix(self,
X=None,
melt=False):
"""
predict a signal given a design matrix. Requires regression to have
been run.
Parameters
----------
X : np.array, (timepoints, n_regressors)
the design matrix for which to predict data.
"""
# check if we have already run the regression - which is necessary
if X is None:
X = self.X
assert hasattr(self, 'betas'), 'no betas found, please run regression before prediction'
assert X.shape[1] == self.betas.shape[0], \
"""designmatrix needs to have the same number of regressors
as the betas already calculated"""
prediction = self.X.dot(self.betas)
if melt:
prediction = prediction.reset_index()\
.melt(var_name='roi',
value_name='prediction',
id_vars='time')
else:
prediction.columns = ['prediction for %s' % c for c in prediction.columns]
return prediction
def get_basis_functions(self, oversample=None):
if oversample is None:
oversample = self.oversample_design_matrix
names = ['event type', 'covariate', 'time']
row_index = pd.MultiIndex(names=names,
levels=[[], [], []],
labels=[[], [], []])
bf = pd.DataFrame(columns=self.X.columns, index=row_index)
for event_type, event in self.events.items():
for covariate in event.covariates.columns:
ev = event.get_basis_function(oversample=oversample)
ev.index = pd.MultiIndex.from_product([[event_type], [covariate], ev.index.get_values()],
names=names)
ev.columns = pd.MultiIndex.from_product([[event_type], [covariate], ev.columns.get_values()],
names=['event type', 'covariate', 'regressor'])
bf = pd.concat((bf, ev), axis=0, )
bf.fillna(0, inplace=True)
return bf
def get_timecourses(self,
oversample=None,
melt=False):
assert hasattr(self, 'betas'), 'no betas found, please run regression before prediction'
if oversample is None:
oversample = self.oversample_design_matrix
timecourses = pd.DataFrame()
for event_type in self.events:
tc = self.events[event_type].get_timecourses(oversample=oversample)
timecourses = pd.concat((timecourses, tc), ignore_index=False)
if melt:
timecourses = timecourses.reset_index().melt(id_vars=['event type',
'covariate',
'time'],
var_name='roi')
return timecourses
def plot_timecourses(self,
oversample=None,
legend=True,
*args,
**kwargs):
if oversample is None:
oversample = 1
tc = self.get_timecourses(melt=True,
oversample=oversample)
tc['subject'] = 'dummy'
return plot_timecourses(tc,
oversample=oversample,
legend=legend,
*args, **kwargs)
[docs] def get_rsq(self):
"""
calculate the rsq of a given fit.
calls predict_from_design_matrix to predict the signal that has been fit
"""
assert hasattr(self, 'betas'), \
'no betas found, please run regression before rsq'
rsq = 1 - (self.ssquares /
((self.input_signal.values - self.input_signal.mean().values)**2).sum(0))
return pd.DataFrame(rsq[np.newaxis, :], columns=self.input_signal.columns)
def get_residuals(self):
if not hasattr(self, '_residuals'):
return self.input_signal - self.predict_from_design_matrix().values
else:
return self._residuals
[docs] def get_epochs(self, onsets, interval, remove_incomplete_epochs=True):
"""
Return a matrix corresponding to specific onsets, within a given
interval. Matrix size is (n_onsets, n_timepoints_within_interval).
Note that any events that are in the ResponseFitter-object will
be regressed out before calculating the epochs.
"""
# If no other events are defined, no need to regress them out
if self.X.shape[1] == 0:
signal = self.input_signal
else:
self.fit(store_residuals=True)
signal = self._residuals
onsets = np.array(onsets)
indices = np.array([signal.index.get_loc(onset, method='nearest') for onset in onsets + interval[0]])
interval_duration = interval[1] - interval[0]
interval_n_samples = int(interval_duration * self.sample_rate) + 1
indices = np.tile(indices[:, np.newaxis], (1, interval_n_samples)) + np.arange(interval_n_samples)
# Set elements in epochs that fall out of time series to nan
indices[indices >= signal.shape[0]] = -1
# Make dummy element to fill epochs with nans if they fall out of the timeseries
signal = pd.concat((signal, pd.DataFrame(np.zeros((1, signal.shape[1])) * np.nan,
columns=signal.columns,
index=[np.nan])),
0)
# Calculate epochs
epochs = signal.values[indices].swapaxes(-1, -2)
epochs = epochs.reshape((epochs.shape[0], np.prod(epochs.shape[1:])))
columns = pd.MultiIndex.from_product([signal.columns,
np.linspace(interval[0], interval[1], interval_n_samples)],
names=['roi', 'time'])
epochs = pd.DataFrame(epochs,
columns=columns,
index=pd.Index(onsets, name='onset'))
# Get rid of incomplete epochs:
if remove_incomplete_epochs:
epochs = epochs[~epochs.isnull().any(1)]
return epochs
def get_time_to_peak(self,
oversample=None,
cutoff=1.0,
negative_peak=False,
include_prominence=False):
if oversample is None:
oversample = self.oversample_design_matrix
if include_prominence:
ix = ['time peak', 'prominence']
else:
ix = ['time peak']
return self.get_timecourses(oversample=oversample)\
.groupby(['event type', 'covariate'])\
.apply(get_time_to_peak_from_timecourse,
negative_peak=negative_peak,
cutoff=cutoff)\
.loc[(slice(None), slice(None), ix), :]
def get_original_signal(self, melt=False):
if melt:
return self.input_signal.reset_index()\
.melt(var_name='roi',
value_name='signal',
id_vars='time')
else:
return self.input_signal
def plot_model_fit(self,
xlim=None,
legend=True):
n_rois = self.input_signal.shape[1]
if n_rois > 24:
raise Exception('Are you sure you want to plot {} areas?!'.format(n_rois))
signal = self.get_original_signal(melt=True)
prediction = self.predict_from_design_matrix(melt=True)
data = signal.merge(prediction)
if n_rois < 4:
col_wrap = n_rois
else:
col_wrap = 4
fac = sns.FacetGrid(data,
col='roi',
col_wrap=col_wrap,
aspect=3)
fac.map(plt.plot, 'time', 'signal', color='k', label='signal')
fac.map(plt.plot, 'time', 'prediction', color='r', ls='--', lw=3, label='prediction')
if xlim is not None:
for ax in fac.axes.ravel():
ax.set_xlim(*xlim)
if legend:
fac.add_legend()
fac.set_ylabels('signal')
fac.set_titles('{col_name}')
return fac
def _check_fitted(self):
assert hasattr(self, 'betas'), \
'no betas found, please run regression before rsq'
def plot_design_matrix(self, palette=None):
return plot_design_matrix(self.X,
palette=palette)
class ConcatenatedResponseFitter(ResponseFitter):
def __init__(self, response_fitters):
self.response_fitters = response_fitters
self.X = pd.concat([rf.X for rf in self.response_fitters]).fillna(0)
for attr in ['sample_rate', 'oversample_design_matrix']:
check_properties_response_fitters(self.response_fitters, attr)
setattr(self, attr, getattr(self.response_fitters[0], attr))
self.input_signal = pd.concat([rf.input_signal for rf in self.response_fitters])
self.events = {}
for rf in self.response_fitters:
self.events.update(rf.events)
def add_intercept(self, *args, **kwargs):
raise Exception('ConcatenatedResponseFitter does not allow for adding'\
'intercepts anymore. Do this in the original response '\
'fitters that get concatenated')
def add_confounds(self, *args, **kwargs):
raise Exception('ConcatenatedResponseFitter does not allow for adding '\
'confounds. Do this in the original response '\
'fytters that get concatenated')
def add_event(self, *args, **kwargs):
raise Exception('ConcatenatedResponseFitter does not allow for adding'\
'events.')
def plot_timecourses(self,
oversample=None,
*args,
**kwargs):
tc = self.get_timecourses(melt=True,
oversample=oversample)
tc['subject'] = 'dummy'
plot_timecourses(tc, *args, **kwargs)
def get_epochs(self, onsets, interval, remove_incomplete_epochs=True):
raise NotImplementedError()
def check_properties_response_fitters(response_fitters, attribute):
attribute_values = [getattr(rf, attribute) for rf in response_fitters]
assert(all([v == attribute_values[0] for v in attribute_values])), "%s not equal across response fitters!" % attribute