# Imports import os import cv2 import glob import h5py import time import random import urllib import pickle import argparse import numpy as np import nibabel as nib import scipy.io as sio import matplotlib.pyplot as plt from tqdm import tqdm from PIL import Image from sklearn.preprocessing import StandardScaler from sklearn.decomposition import PCA, IncrementalPCA import torch import torch.nn as nn import torch.utils.model_zoo as model_zoo from torch.autograd import Variable as V from torchvision import transforms as trn # @title Load mat files def loadmat(matfile): """Function to load .mat files. Parameters ---------- matfile : str path to `matfile` containing fMRI data for a given trial. Returns ------- dict dictionary containing data in key 'vol' for a given trial. """ try: f = h5py.File(matfile) except (IOError, OSError): return sio.loadmat(matfile) else: return {name: np.transpose(f.get(name)) for name in f.keys()} # @title Data download and unzip in `data/` import os, zipfile, requests fname = 'data.zip' url = "https://osf.io/7vpyh/download" if not os.path.isfile(fname): try: r = requests.get(url) except requests.ConnectionError: print("!!! Failed to download data !!!") else: if r.status_code != requests.codes.ok: print("!!! Failed to download data !!!") else: print("Downlading data...") with open(fname, "wb") as fid: fid.write(r.content) with zipfile.ZipFile(fname, 'r') as zip_ref: zip_ref.extractall('data/') print("Download completed!") # Load MEG RDMs for each time point for all subjects all sessions MEG_RDMs = loadmat("data/MEG_decoding_RDMs.mat")['MEG_decoding_RDMs'] print(MEG_RDMs.shape) # average RDM across subjects and sessions MEG_RDM_sub_averaged = np.mean(MEG_RDMs, axis=(0, 1)) del MEG_RDMs # @title visualize subject averaged MEG RDMs timepoint = 420 # @param {type:"slider", min:-100, max:600, step:20} # Load RDM at a given timepoint # +100 as the RDMs provided are from -100ms to 1000ms after the stimulus onset RDM = np.array(MEG_RDM_sub_averaged[timepoint+100]) # Since the matrix is symmetric we set upper triangular values to NaN RDM[np.triu_indices(RDM.shape[0], 1)] = np.nan # plot the RDM at given timepoint plt.imshow(RDM, cmap="bwr") plt.title("MEG RDM at t = " + str(timepoint)) cbar = plt.colorbar() plt.xlabel("Stimuli") plt.ylabel("Stimuli") cbar.ax.get_yaxis().labelpad = 15 cbar.ax.set_ylabel('Decoding Accuracy', rotation=270) plt.show() fMRI_file = 'data/92_Image_Set/target_fmri.mat' # path of fMRI RDM file fMRI_RDMs = loadmat(fMRI_file) # load the fMRI RDMs print(fMRI_RDMs.keys()) print(fMRI_RDMs['EVC_RDMs'].shape) # @title visualize subject averaged fMRI RDMs ROI = 'EVC' # @param ["EVC", "IT"] # Average the ROI RDM across subjects RDM = np.array(fMRI_RDMs[ROI + '_RDMs'].mean(axis=0)) # Since the matrix is symmetric we set upper triangular values to NaN RDM[np.triu_indices(RDM.shape[0], 1)] = np.nan # plot the ROI RDM at given timepoint plt.imshow(RDM, cmap="bwr") plt.title(ROI + " RDM") cbar = plt.colorbar() plt.xlabel("Stimuli") plt.ylabel("Stimuli") cbar.ax.get_yaxis().labelpad = 15 cbar.ax.set_ylabel('1-Correlation', rotation=270) plt.show() # @title RDM Comparison functions from scipy.stats import spearmanr def RSA_spearman(rdm1,rdm2): """ computes and returns the spearman correlation between lower triangular part of the input rdms. We only need to compare either lower or upper triangular part of the matrix as RDM is symmetric """ # get lower triangular part of the RDM1 lt_rdm1 = get_lowertriangular(rdm1) # get lower triangular part of the RDM1 lt_rdm2 = get_lowertriangular(rdm2) # return Spearman's correlation between lower triangular part of rdm1 & rdm2 return spearmanr(lt_rdm1, lt_rdm2)[0] def get_lowertriangular(rdm): """ returns lower triangular part of the matrix """ num_conditions = rdm.shape[0] return rdm[np.tril_indices(num_conditions, -1)] # @title Correlating MEG RDMs with fMRI RDMs num_timepoints = MEG_RDM_sub_averaged.shape[0] #get number of timepoints # initialize a dictionary to store MEG and ROI RDM correlation at each timepoint MEG_correlation = {} ROIs = ['EVC','IT'] for ROI in ROIs: MEG_correlation[ROI] = [] # for loop that goes over MEG RDMs at all time points and correlate with ROI RDMs for t in range(num_timepoints): MEG_RDM_t = MEG_RDM_sub_averaged[t, :, :] for ROI in ROIs: ROI_RDM = np.mean(fMRI_RDMs[ROI + '_RDMs'], axis=0) MEG_correlation[ROI].append(RSA_spearman(ROI_RDM, MEG_RDM_t)) # @title Plotting MEG-fMRI comparison plt.rc('font', size=12) fig, ax = plt.subplots(figsize=(10, 6)) time_range = range(-100, 1201) ax.plot(time_range, MEG_correlation['IT'], color='tab:orange', label='IT') ax.plot(time_range, MEG_correlation['EVC'], color='tab:blue', label='EVC') # Same as above ax.set_xlabel('Time') ax.set_ylabel('Spearmans Correlation') ax.set_title('MEG-fMRI fusion') ax.grid(True) ax.legend(loc='upper left') fig.show() # @title AlexNet Definition __all__ = ['AlexNet', 'alexnet'] model_urls = { 'alexnet': 'https://download.pytorch.org/models/alexnet-owt-4df8aa71.pth', } # Here we redefine AlexNet differently from torchvision code for better understanding class AlexNet(nn.Module): def __init__(self, num_classes=1000): super(AlexNet, self).__init__() self.conv1 = nn.Sequential( nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=3, stride=2), ) self.conv2 = nn.Sequential( nn.Conv2d(64, 192, kernel_size=5, padding=2), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=3, stride=2), ) self.conv3 = nn.Sequential( nn.Conv2d(192, 384, kernel_size=3, padding=1), nn.ReLU(inplace=True), ) self.conv4 = nn.Sequential( nn.Conv2d(384, 256, kernel_size=3, padding=1), nn.ReLU(inplace=True), ) self.conv5 = nn.Sequential( nn.Conv2d(256, 256, kernel_size=3, padding=1), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=3, stride=2), ) self.fc6 = nn.Sequential( nn.Dropout(), nn.Linear(256 * 6 * 6, 4096), nn.ReLU(inplace=True), ) self.fc7 =nn.Sequential( nn.Dropout(), nn.Linear(4096, 4096), ) self.fc8 = nn.Sequential( nn.ReLU(inplace=True), nn.Linear(4096, num_classes), ) def forward(self, x): out1 = self.conv1(x) out2 = self.conv2(out1) out3 = self.conv3(out2) out4 = self.conv4(out3) out5 = self.conv5(out4) out5_reshaped = out5.view(out5.size(0), 256 * 6 * 6) out6= self.fc6(out5_reshaped) out7= self.fc7(out6) out8 = self.fc8(out7) return out1, out2, out3,out4, out5, out6,out7,out8 def alexnet(pretrained=False, **kwargs): """AlexNet model architecture from the `"One weird trick..." `_ paper. Args: pretrained (bool): If True, returns a model pre-trained on ImageNet """ model = AlexNet(**kwargs) if pretrained: model.load_state_dict(model_zoo.load_url(model_urls['alexnet'])) return model # @title Feature extraction code def load_alexnet(model_checkpoints): """This function initializes an Alexnet and load its weights from a pretrained model. Since we redefined model in a different way we have to rename the weights that were in the pretrained checkpoint. ---------- model_checkpoints : str model checkpoints location. Returns ------- model pytorch model of alexnet """ model = alexnet() # Load checkpoint model_file = model_checkpoints checkpoint = torch.load(model_file, map_location=lambda storage, loc: storage) # Rename the checkpoint keys according to new definition model_dict = ["conv1.0.weight", "conv1.0.bias", "conv2.0.weight", "conv2.0.bias", "conv3.0.weight", "conv3.0.bias", "conv4.0.weight", "conv4.0.bias", "conv5.0.weight", "conv5.0.bias", "fc6.1.weight", "fc6.1.bias", "fc7.1.weight", "fc7.1.bias", "fc8.1.weight", "fc8.1.bias"] state_dict = {} i = 0 for k, v in checkpoint.items(): state_dict[model_dict[i]] = v i += 1 # initialize model with pretrained weights model.load_state_dict(state_dict) if torch.cuda.is_available(): model.cuda() model.eval() return model def get_activations_and_save(model, image_list, activations_dir): """This function generates Alexnet features and save them in a specified directory. Parameters ---------- model : pytorch model : alexnet. image_list : list the list contains path to all images. activations_dir : str save path for extracted features. """ resize_normalize = trn.Compose([ trn.Resize((224, 224)), trn.ToTensor(), trn.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]) # for all images in the list generate and save activations for image_file in tqdm(image_list): # open image img = Image.open(image_file) image_file_name = os.path.split(image_file)[-1].split(".")[0] # apply transformations before feeding to model input_img = V(resize_normalize(img).unsqueeze(0)) if torch.cuda.is_available(): input_img=input_img.cuda() x = model.forward(input_img) activations = [] for i,feat in enumerate(x): activations.append(feat.data.cpu().numpy().ravel()) for layer in range(len(activations)): save_path = os.path.join(activations_dir, f"{image_file_name}_{layer}_{layer + 1}.npy") np.save(save_path, activations[layer]) # get the paths to all the images in the stimulus set image_dir = 'data/92_Image_Set/92images' image_list = glob.glob(image_dir + '/*.jpg') image_list.sort() print(f'Total Number of Images: {len(image_list)}') save_dir = "activations_alexnet" ######### load Alexnet initialized with pretrained weights ################### # Download pretrained Alexnet from: # https://download.pytorch.org/models/alexnet-owt-4df8aa71.pth # and save in the current directory checkpoint_path = "alexnet.pth" if not os.path.exists(checkpoint_path): url = "https://download.pytorch.org/models/alexnet-owt-4df8aa71.pth" urllib.request.urlretrieve(url, "alexnet.pth") model = load_alexnet(checkpoint_path) ############################################################################## ######### get and save activations ################################ activations_dir = os.path.join(save_dir) if not os.path.exists(activations_dir): os.makedirs(activations_dir) print("-------------Saving activations ----------------------------") get_activations_and_save(model, image_list, activations_dir) ################################################################### num_layers = 8 # number of layers in the model layers = [] for i in range(num_layers): layers.append(f"layer_{i + 1}") model_RDMs = {} # create RDM for each layer from activations for j, layer in enumerate(layers): activation_files = glob.glob(activations_dir + '/*' + f'{j}.npy') activation_files.sort() activations = [] # Load all activations for activation_file in activation_files: activations.append(np.load(activation_file)) activations = np.array(activations) # calculate Pearson's distance for all pairwise comparisons model_RDMs[layer] = 1 - np.corrcoef(activations) # @title visualize model RDMs layer = 'layer_8' # @param ['layer_1','layer_2','layer_3','layer_4','layer_5','layer_6','layer_7','layer_8'] # loading layer RDM RDM = np.array(model_RDMs[layer]) # Since the matrix is symmetric we set upper triangular values to NaN RDM[np.triu_indices(RDM.shape[0], 1)] = np.nan # Visualize layer RDM plt.imshow(RDM, cmap="bwr") plt.title(layer + " RDM") cbar = plt.colorbar() plt.xlabel("Stimuli") plt.ylabel("Stimuli") cbar.ax.get_yaxis().labelpad = 15 cbar.ax.set_ylabel('1-Correlation', rotation=270) plt.show() # @title Correlating MEG RDMs with DNN RDMs run = False if run: num_timepoints = MEG_RDM_sub_averaged.shape[0] # get number of timepoints # initialize a dictionary to store MEG and DNN RDM correlation at each timepoint for layer in layers: MEG_correlation[layer] = [] # for loop that goes over MEG RDMs at all time points and correlate with DNN RDMs for t in range(num_timepoints): MEG_RDM_t = MEG_RDM_sub_averaged[t, :, :] for layer in layers: model_RDM = model_RDMs[layer] MEG_correlation[layer].append(RSA_spearman(model_RDM, MEG_RDM_t)) # @title Plotting MEG-DNN comparison if run: plt.rc('font', size=12) fig, ax = plt.subplots(figsize=(10, 6)) time_range = range(-100,1201) ax.plot(time_range, MEG_correlation['layer_1'], color='tab:orange', label='layer_1') ax.plot(time_range, MEG_correlation['layer_7'], color='tab:blue', label='layer_7') # Same as above ax.set_xlabel('Time') ax.set_ylabel('Spearmans Correlation') ax.set_title('MEG-model comparison') ax.grid(True) ax.legend(loc='upper left') fig.show()