#!/usr/bin/env python # coding: utf-8 # In[2]: import warnings warnings.filterwarnings('ignore') # In[3]: ############################# Importing packages ################################### import numpy as np # to handle arrays and matrices import pickle from scipy.linalg import toeplitz # to generate toeplitz matrix from scipy.stats import chi2 # to have chi2 quantiles from scipy.special import chdtri import matplotlib.pyplot as plt # to plot histograms ... import pandas as pd # to handle and create dataframes import time import concurrent.futures import random import os from itertools import product ####################### For printing with colors ######################### class color: PURPLE = '\033[95m' BLACK = '\033[1;90m' CYAN = '\033[96m' DARKCYAN = '\033[36m' BLUE = '\033[94m' GREEN = '\033[1;92m' YELLOW = '\033[93m' RED = '\033[1;91m' BOLD = '\033[1m' UNDERLINE = '\033[4m' END = '\033[0m' BBCKGRND = '\033[0;100m' RBCKGRND = '\033[0;101m' print(color.BLUE + color.BOLD + '******************** Starting the program ! ********************' + color.END ) # In[5]: # importing q_list, n_list, S2 diagonals, for different sigma (sigma means here the std of underluying normal # distribution that generates lognormal vectors), having the same alpha q_list = pickle.load(open("q_list", "rb")) n_list = pickle.load(open("n_list", "rb")) vp_collection_d099 = pickle.load(open("vp_collection_d099", "rb")) # In[6]: def scalar(A,B): """Takes two symmetric matrices A and B of sizes q and returns the modified frobenius scalar of A and B """ return(np.trace(A.dot(np.transpose(B)))/A.shape[0]) def norm(A): """Takes a symmetric matrix A of sizes q and returns the norm of A This norm is associated to the modified frobenius scalar """ return np.sqrt(scalar(A,A)) def alphaaa2(vec): q = len(vec) I_q = np.diag(np.ones(q)) S_2 = np.diag(vec) sigma2 = scalar(S_2,I_q) alpha2 = norm(S_2 - sigma2*I_q)**2 return alpha2 # In[7]: print("q_list: ", q_list) print("n_list: ",n_list) # In[8]: list(product([1,2],[3,5,6])) # In[9]: for i in range(5): if i == 0 : array = np.array(i*np.ones(5, dtype=np.int)) else : array = np.vstack((array,np.array(i*np.ones(5, dtype=np.int)))) array # In[10]: n_list = list(n_list) ; q_list = list(q_list) # In[11]: ######################## Generating/reading eigenvalues and saving them in a vp file ######################### valeur_propre_collection = vp_collection_d099 valeur_propre_collection[q_list.index(50)] # In[16]: print([alphaaa2(i) for i in vp_collection_d099]) # In[ ]: # In[10]: # list(product(n_list, range(K)))[:12] # In[19]: ########################## Choosing dimension ###################################### #Β K = int(input("Number of Monte Carlo iterations is : ")) print() print("list of q values : ", q_list, "\n") print("list of n values : ", n_list, "\n") ######################### Generator function ################################ def generator(n, q): """ Creating a function that gets into paramaters : 𝑛 the number of observations π‘ž the dimension 𝑠 dependency threshold 𝜎² covariance parameter and returns a matrix of n observations (n rows), where each row represents a q-vector normally distributed with a mean 0 and covariance matrix SΒ² defined by a toeplitz matrix with a threshold s """ # defining eigenvalues, lognormal variables valeur_propre = np.diag(valeur_propre_collection[q_list.index(q)]) # defining a mean vector and a covariance matrix mean = np.zeros(q) ; cov = valeur_propre # z_intermediate = q rows and n columns we still need to transpose z_int = np.random.multivariate_normal(mean, cov, n).T # z sample matrix, having n rows and q columns z = np.transpose(z_int) # Returning the matrix of n-observations of dimension q return z ########################### Statistics functions ##################### def sn2(z): return np.cov(np.transpose(z)) def zbar(z): """takes a n x q sample matrix and return the vector mean of each column """ return z.mean(axis=0) def max_p(M): """Largest eigenvalue of a given matrix M""" val_p = np.linalg.eigvals(M) return max(val_p.real) def min_p(M): """Smallest eigenvalue of a given matrix M that is not null""" val_p = np.linalg.eigvals(M) val_p = val_p[val_p>=10**-6] return min(val_p.real) def product_vect(z, i): return np.matmul(z[i].reshape(z[i].shape[0], 1), np.transpose(z[i].reshape(z[i].shape[0], 1))) ###############################Β Algebra functions ##################################### def scalar(A,B): """Takes two symmetric matrices A and B of sizes q and returns the modified frobenius scalar of A and B """ return(np.trace(A.dot(np.transpose(B)))/A.shape[0]) def norm(A): """Takes a symmetric matrix A of sizes q and returns the norm of A This norm is associated to the modified frobenius scalar """ return np.sqrt(scalar(A,A)) # # Time comparision # In[20]: def monte_carlo(k): # random.seed(k) np.random.seed(int(os.getpid() * time.time()) % 123456789) z = generator(n,q) sn_2 = sn2(z) # beta = (norm(sn_2 - s_2))**2 # calculate empirical mean z_bar = zbar(z) #empirical covariance matrix sn_2 = sn2(z) # Calculating empirical eigenvalues and eigenvectors (of SnΒ²) emp_val_p, emp_vec_p = np.linalg.eigh(sn_2) # Calculating true (theoretical) eigenvalues and eigenvectors (of SΒ²) # val_p, vec_p = np.linalg.eigh(s_2) # Identity matrix of size q I_q = np.diag(np.ones(q)) # sigma_n (πœŽΒ²π‘›) sigma_n = scalar(sn_2, I_q) # delta_n (𝛿𝑛²) delta_n = norm(sn_2 - sigma_n*I_q)**2 # intermediate beta_n beta_bar_n = (1/n**2)*0 for i in range(n): beta_bar_n += (1/n**2)*norm(product_vect(z, i) - sn_2)**2 # beta_n (𝛽𝑛²) beta_n = min(beta_bar_n, delta_n) # alpha_n (𝛼²𝑛) alpha_n = delta_n - beta_n # rho_n (πœŒβˆ—Β²π‘›) rho_n = (beta_n/alpha_n)*sigma_n rho_1_n = (beta_n/delta_n)*sigma_n rho_2_n = alpha_n/delta_n Sigma_n_hat_ast = sn_2 + rho_n*I_q Sigma_n_hat = rho_1_n*I_q + rho_2_n*sn_2 self_norm_sum = n*z_bar.dot(np.linalg.inv(Sigma_n_hat).dot(np.transpose(z_bar))) self_norm_sum_ast = n*z_bar.dot(np.linalg.inv(Sigma_n_hat_ast).dot(np.transpose(z_bar))) return np.array([self_norm_sum, self_norm_sum_ast, beta_n, sigma_n, alpha_n, rho_n], dtype=np.int) t1 = time.perf_counter() dico = {} for n in n_list[:3]: for q in q_list[:3]: if n <= q : dico[(n,q)] = np.stack(list(map(monte_carlo, range(10)))) print(dico[list(dico.keys())[0]]) t2 = time.perf_counter() print(f'Finished in {t2-t1} seconds') # In[21]: len(dico.keys()) # In[22]: t1 = time.perf_counter() data = {} for q in q_list[:3]: for n in n_list[:3]: if n <= q : with concurrent.futures.ProcessPoolExecutor() as executor: f1 = np.stack(list(executor.map(monte_carlo, range(10)))) #f2 = f1.result() data[(n,q)] = f1 # if q/n%1==0 : print(str(i)+","+str(j)+")", #color.BLUE + color.BOLD + f"for n = %d \t and q = %d"%(n,q) + color.END,"\n", #f1, "\n\n") print(data[list(data.keys())[0]]) t2 = time.perf_counter() print(f'Finished in {t2-t1} seconds') # In[23]: data[list(data.keys())[1]] # # Generating step # In[24]: K = int(input("Number of Monte Carlo iterations is : ")) # In[25]: t_init = time.perf_counter() dico = {} for n in n_list: for q in q_list: t1 = time.perf_counter() if n <= q : dico[(n,q)] = np.stack(list(map(monte_carlo, range(10)))) t2 = time.perf_counter() if q==n or q==2*n : print(f"q = {q} and n = {n} --> ",f'Finished in {round(t2-t1, 4)} seconds') print(dico[list(dico.keys())[0]][:10]) t_fin = time.perf_counter() print() print(f'All loops finished in {round(t_fin-t_init, 3)} seconds') # In[26]: def monte_carlo(k): # random.seed(k) np.random.seed(int(os.getpid() * time.time()) % 123456789) z = generator(n,q) sn_2 = sn2(z) s_2 = np.diag(valeur_propre_collection[q_list.index(q)]) beta = (norm(sn_2 - s_2))**2 # calculate empirical mean z_bar = zbar(z) #empirical covariance matrix sn_2 = sn2(z) # Calculating empirical eigenvalues and eigenvectors (of SnΒ²) emp_val_p, emp_vec_p = np.linalg.eigh(sn_2) # Calculating true (theoretical) eigenvalues and eigenvectors (of SΒ²) # val_p, vec_p = np.linalg.eigh(s_2) # Identity matrix of size q I_q = np.diag(np.ones(q)) # sigma_n (πœŽΒ²π‘›) sigma_n = scalar(sn_2, I_q) # delta_n (𝛿𝑛²) delta_n = norm(sn_2 - sigma_n*I_q)**2 # intermediate beta_n beta_bar_n = (1/n**2)*0 for i in range(n): beta_bar_n += (1/n**2)*norm(product_vect(z, i) - sn_2)**2 # beta_n (𝛽𝑛²) beta_n = min(beta_bar_n, delta_n) # alpha_n (𝛼²𝑛) alpha_n = delta_n - beta_n # rho_n (πœŒβˆ—Β²π‘›) rho_n = (beta_n/alpha_n)*sigma_n rho_1_n = (beta_n/delta_n)*sigma_n rho_2_n = alpha_n/delta_n Sigma_n_hat_ast = sn_2 + rho_n*I_q Sigma_n_hat = rho_1_n*I_q + rho_2_n*sn_2 self_norm_sum = n*z_bar.dot(np.linalg.inv(Sigma_n_hat).dot(np.transpose(z_bar))) self_norm_sum_ast = n*z_bar.dot(np.linalg.inv(Sigma_n_hat_ast).dot(np.transpose(z_bar))) return np.array([self_norm_sum, self_norm_sum_ast, beta_n, sigma_n, alpha_n, rho_n, max_p(sn_2), min_p(sn_2), beta]) # In[27]: print(color.BLUE + color.BOLD + '******************** Starting the extraction ! ********************' + color.END ) K = int(input("Number of Monte Carlo iterations is : ")) t_init = time.perf_counter() dico = {} for n in n_list: for q in q_list: t1 = time.perf_counter() if n <= q : dico[(n,q)] = np.stack(list(map(monte_carlo, range(K)))) t2 = time.perf_counter() if q==n or q==2*n : print(f"q = {q} and n = {n} --> ",f'Finished in {round(t2-t1, 4)} seconds') print(dico[list(dico.keys())[0]][:10]) t_fin = time.perf_counter() print() print(f'All loops finished in {round(t_fin-t_init, 3)} seconds') # In[28]: pickle.dump(dico, open("data_vp_collection_d099", "wb")) # In[29]: aaa = pickle.load(open("data_vp_collection_d099", "rb")) len(aaa[list(aaa.keys())[0]]) # In[30]: aaa[list(aaa.keys())[0]][0] # In[ ]: # In[ ]: