from .response_fitter import ResponseFitter
from nilearn._utils import load_niimg
from nilearn import input_data, image
import pandas as pd
import numpy as np
import logging
from .utils import get_time_to_peak_from_timecourse
[docs]class NiftiResponseFitter(ResponseFitter):
def __init__(self,
func_img,
sample_rate,
mask=None,
oversample_design_matrix=20,
add_intercept=True,
detrend=False,
standardize=False,
confounds_for_extraction=None,
memory=None,
**kwargs):
if isinstance(confounds_for_extraction, pd.DataFrame):
confounds_for_extraction = confounds_for_extraction.values
self.confounds = confounds_for_extraction
if isinstance(mask, input_data.NiftiMasker):
self.masker = mask
else:
if mask is None:
logging.warn('No mask has been given. Nilearn will automatically try to'\
'make one')
else:
mask = load_niimg(mask)
self.masker = input_data.NiftiMasker(mask,
detrend=detrend,
standardize=standardize,
memory=memory)
input_signal = self.masker.fit_transform(func_img,
confounds=confounds_for_extraction)
self.n_voxels = input_signal.shape[1]
super(NiftiResponseFitter, self).__init__(input_signal=input_signal,
sample_rate=sample_rate,
oversample_design_matrix=oversample_design_matrix,
add_intercept=add_intercept,
**kwargs)
[docs] def ridge_regress(self, *args, **kwargs):
raise NotImplementedError('Not implemented for NiftiResponseFitter')
[docs] def predict_from_design_matrix(self, X=None):
prediction = super(NiftiResponseFitter, self).predict_from_design_matrix(X)
return self._inverse_transform(prediction)
def get_timecourses(self,
oversample=None,
average_over_mask=False,
transform_to_niftis=True,
**kwargs
):
if len(self.events) is 0:
raise Exception("No events were added")
timecourses = super(NiftiResponseFitter, self).get_timecourses(oversample=oversample,
melt=False,
**kwargs)
if transform_to_niftis:
if average_over_mask:
average_over_mask = load_niimg(average_over_mask)
weights = image.math_img('mask / mask.sum()',
mask=average_over_mask)
weights = self.masker.fit_transform(weights)
timecourses = timecourses.dot(weights.T)
return timecourses.sum(1)
else:
tc_df = []
for (event_type, covariate, time), tc in timecourses.groupby(level=['event type', 'covariate', 'time']):
#timepoints = tc.index.get_level_values('time')
tc_nii = self._inverse_transform(tc)
tc = pd.DataFrame([tc_nii], index=pd.MultiIndex.from_tuples([(event_type, covariate, time)],
names=['event type', 'covariate', 'time']),
columns=['nii'])
tc_df.append(tc)
return pd.concat(tc_df)
else:
return timecourses
def _inverse_transform(self,
data):
return self.masker.inverse_transform(data)
def get_residuals(self):
residuals = image.math_img('data - prediction',
data=self._inverse_transform(self.input_signal),
prediction=self.predict_from_design_matrix())
return residuals
[docs] def get_rsq(self):
"""
calculate the rsq of a given fit.
calls predict_from_design_matrix to predict the signal that has been fit
"""
return self._inverse_transform(super().get_rsq())
def get_time_to_peak(self,
oversample=None,
negative_peak=False,
include_prominence=False):
if oversample is None:
oversample = self.oversample_design_matrix
if include_prominence:
ix = ['time peak', 'prominence']
else:
ix = ['time peak']
peaks = self.get_timecourses(oversample=oversample,
transform_to_niftis=False)\
.groupby(['event type', 'covariate'])\
.apply(get_time_to_peak_from_timecourse,
negative_peak=negative_peak)
return peaks.apply(lambda d: self._inverse_transform(d), 1)\
.to_frame('nii').loc[(slice(None), slice(None), ix), :]
class GroupNiftiResponseFitter(object):
def __init__(self,
oversample_design_matrix=20,
detrend=False,
standardize=False,
memory=None,
add_intercept=True,
**kwargs):
self.oversample_design_matrix = oversample_design_matrix
self.add_intercept = add_intercept
self.detrend = detrend
self.standardize = standardize
self.memory = memory
self.events = []
self.images = []
def add_event(self,
event=None,
basis_set='fir',
interval=[0,10],
n_regressors=None,
covariates=None,
add_intercept=True,
**kwargs):
event_kws = {'event':event,
'basis_set':basis_set,
'interval':interval,
'n_regressors':n_regressors,
'covariates':covariates,
'add_intercept':add_intercept
}
event_kws.update(kwargs)
self.events.append(event_kws)
def add_image(self,
func_img,
behavior,
sample_rate,
subj_idx,
run=None,
mask=None,
confounds=None):
image_kws = {'func_img':func_img,
'behavior':behavior,
'subj_idx':subj_idx,
'run':run,
'mask':mask,
'confounds':confounds,
'sample_rate':sample_rate}
self.images.append(image_kws)
def fit(self, keep_timeseries=False):
self.timecourses = pd.DataFrame()
self.fitters = pd.DataFrame([],
index=pd.MultiIndex.from_tuples([], names=['subj_idx', 'run']),
columns=['fitter'])
for image in self.images:
logging.info('Fitting subject {subj_idx}, run {run}'.format(**image))
fitter = NiftiResponseFitter(image['func_img'],
image['sample_rate'],
image['mask'],
self.oversample_design_matrix,
self.add_intercept,
self.detrend,
self.standardize,
memory=self.memory)
if image['confounds'] is not None:
fitter.add_confounds('confounds', image['confounds'].copy())
for event in self.events:
behavior = image['behavior']
behavior = behavior[behavior.trial_type == event['event']]
if event['covariates'] is None:
covariate_matrix = None
else:
covariate_matrix = behavior[event['covariates']]
if event['add_intercept']:
intercept_matrix = pd.DataFrame(np.ones((len(covariate_matrix), 1)),
columns=['intercept'],
index=covariate_matrix.index)
covariate_matrix = pd.concat((intercept_matrix, covariate_matrix), 1)
if 'duration' in behavior and np.isfinite(behavior['duration']).all():
durations = behavior['duration']
else:
durations = None
fitter.add_event(event['event'],
behavior.onset,
event['basis_set'],
event['interval'],
event['n_regressors'],
durations,
covariate_matrix)
fitter.regress()
tc = fitter.get_timecourses()
if not keep_timeseries:
tc.input_signal = None
self.fitters.loc[(image['subj_idx'], image['run']), 'fitter'] = fitter
tc['subj_idx'] = image['subj_idx']
tc['run'] = image['run']
tc.set_index(['subj_idx', 'run'])
self.timecourses = pd.concat((self.timecourses, tc), 0)
self.timecourses = self.timecourses.reset_index().set_index(['subj_idx', 'run', 'event type', 'covariate', 'time'])
def get_timecourses(self, mask=None, names=None,
event_types=None,
covariates=None,
resampling_target=None):
if mask is None:
return self.timecourses
if type(mask) is not list:
mask = [mask]
if event_types is None:
event_types = slice(None)
if covariates is None:
covariates = slice(None)
if names:
names = pd.Index(names, name='roi')
masker = input_data.NiftiMapsMasker(mask, resampling_target=resampling_target)
masker.fit()
ix = pd.IndexSlice
timecourses = self.timecourses.loc[ix[:, :,event_types, covariates, :], :]
logging.info('Concatenating...')
concat_niis = image.concat_imgs(timecourses.nii.tolist())
logging.info('Masking...')
timecourses = pd.DataFrame(masker.transform(concat_niis),
index=timecourses.index,
columns=names)
return timecourses
def get_time_to_peak(self,
oversample=None,
cutoff=1.0,
negative_peak=False,
include_prominence=False):
if oversample is None:
oversample = self.oversample_design_matrix
if include_prominence:
ix = ['time peak', 'prominence']
else:
ix = ['time peak']
peaks = self.get_timecourses(oversample=oversample,
transform_to_niftis=False)\
.groupby(['event type', 'covariate'])\
.apply(get_time_to_peak_from_timecourse,
negative_peak=negative_peak,
cutoff=cutoff)\
.loc[(slice(None), slice(None), ix), :]
return peaks.groupby(['event type', 'covariate']) \
.apply(lambda d: self._inverse_transform(d), 1)\
.to_frame('nii')