The dynamical system of interest:
\begin{equation}
\xi_0 \ddot{u}-\xi_1 u_{xx}-\xi_2 \dot{u}_{xx}-\xi_3 \ddot{u}_{xx} - \xi_4 u_{xxxx}+\beta u_x u_{xx} = 0,
\end{equation}
We want to identify the nonlinear parameter $\beta$ using the sensor measurement $s(t)=\dot{u}(x_m,t)$ at location $x=x_m$. The sensor measurements were generated by simulating the above PDE using the spectral method. The initial conditions for the displacement and the velocity are given by:
\begin{align}
&u(x,0) = e^{-\frac{(x-x_0)^2}{2\sigma^2}} \nonumber\\
&\dot{u}(x,0) = \frac{\nu(x - x_0)}{\sigma^2}u(x,0),
\end{align}
where $\dot{u}(x,0):=\frac{\partial u(x,t)}{\partial t}|_{t=0}$ and $\nu$ is the wave speed.
## Import necessary python packages
import numpy as np
from numpy import interp
import math
import matplotlib.pyplot as plt
from scipy.linalg import lstsq
from scipy import signal
import os
import h5py
from pathlib import Path
import random
import time
import sys
sys.path.append('../')
from pytranskit.optrans.continuous import SCDT
import numpy.linalg as LA
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
class SCDT_NLS_sysId:
def __init__(self, num_classes):
default = False
self.num_classes = num_classes
self.Nset = []
self.subspaces = []
self.len_subspace = 0
self.k = 1
self.label = []
self.pca_basis = []
self.n_enrV = 1
def fit(self, X, Y, Beta, no_local_enrichment=False):
Xtrain, Xval, Ytrain, Yval = train_test_split(X, Y, test_size=0.3, random_state=0)
self.bas = []
for class_idx in range(self.num_classes):
# generate the bases vectors
class_data = Xtrain[Ytrain == class_idx]
self.Nset.append(class_data)
self.label.append(class_idx)
bas = []
for j in range(class_data.shape[0]):
flat = np.copy(class_data[j].reshape(1,-1))
u, s, vh = LA.svd(flat,full_matrices=False)
bas.append(vh[:flat.shape[0]])
self.bas.append(bas)
if Xtrain.shape[0]//self.num_classes < 4:
self.k = 1
else:
smp_class = []
for i in range(len(np.unique(Ytrain))):
smp_class.append(np.count_nonzero(Ytrain == i))
k_range = range(1,min(min(smp_class),100))
n_range = range(-1,2) # range(-1,0) means N=-1, i.e., no tuning of N
self.k, self.n_enrV = self.find_kN(Xval, Yval, k_range, n_range)
self.Nset = []
self.label = []
self.bas = []
self.Beta = []
for class_idx in range(self.num_classes):
# generate the bases vectors
class_data = X[Y == class_idx]
self.Beta.append(Beta[Y == class_idx])
self.Nset.append(class_data)
self.label.append(class_idx)
bas = []
for j in range(class_data.shape[0]):
if no_local_enrichment:
flat = np.copy(class_data[j].reshape(1,-1))
else:
flat = self.add_k_enrich(class_data[j].reshape(1,-1), k=self.n_enrV) # k=0 => translation only
u, s, vh = LA.svd(flat,full_matrices=False)
bas.append(vh[:flat.shape[0]])
self.bas.append(bas)
def predict(self, X, use_gpu=False, k=None, N=None):
if k is not None:
k_opt = k
else:
k_opt = self.k
if N is not None:
n_opt = N
else:
n_opt = self.n_enrV
V = np.ones([1, X.shape[1]])
lmd = 0.001
D = []
Beta = []
for class_idx in range(self.num_classes):
Xi = self.Nset[class_idx]
Xi_bas = self.bas[class_idx]
Xi_beta = self.Beta[class_idx]
d = np.zeros([X.shape[0],1])
beta_class = np.zeros([X.shape[0],1])
B = []
L_basis = []
for i in range(X.shape[0]):
x = X[i,:]
dist_i = []
for j in range(Xi.shape[0]):
basj = Xi_bas[j]#[:self.len_subspace,:]
projR = x @ basj.T @ basj # (n_samples, n_features)
dist_i.append(LA.norm(projR - x))
dist_i = np.stack(dist_i)
indx = dist_i.argsort()[:k_opt]
# calculate est_beta for this class
f_norm = np.sum([1./dist_i[j] for j in indx])
beta_class[i] = np.sum([Xi_beta[j]/dist_i[j] for j in indx])/f_norm
# calculate local subspace distance
Ni = self.add_k_enrich(Xi[indx,:], k=n_opt) # k=0 => translation only
u, s, vh = LA.svd(Ni,full_matrices=False)
cum_s = np.cumsum(s)
cum_s = cum_s/np.max(cum_s)
basis = vh[:Ni.shape[0]]
B.append(basis)
L_basis.append((np.where(cum_s>=0.99)[0])[0]+1)
max_basis = min(L_basis)
Beta.append(np.squeeze(beta_class))
for i in range(X.shape[0]):
x = X[i,:]
basis = B[i][:max_basis,:]
proj = x @ basis.T # (n_samples, n_basis)
projR = proj @ basis # (n_samples, n_features)
d[i]=LA.norm(projR - x)
D.append(np.squeeze(d))
D = np.stack(D, axis=0)
preds = np.argmin(D, axis=0)
pred_label = [self.label[i] for i in preds]
return pred_label
def find_kN(self, X, y, k_range, n_range):
n = X.shape[0]
max_acc = 0.
score_prev = 0.
k_opt = 1
count = 0
acc_count = 0
### calculate distances for samples in validation set
indx = []
for i in range(X.shape[0]):
x = np.copy(X[i,:])
indXi = []
for class_idx in range(self.num_classes):
Xi = self.Nset[class_idx]
Xi_bas = self.bas[class_idx]
dist_i = []
for j in range(Xi.shape[0]):
basj = Xi_bas[j]#[:self.len_subspace,:]
projR = x @ basj.T @ basj # (n_samples, n_features)
dist_i.append(LA.norm(projR - x))
dist_i = np.stack(dist_i)
indXi.append(dist_i.argsort()[:max(k_range)+1])
indx.append(indXi)
### tune k using validation set
for k in k_range:
D = []
for class_idx in range(self.num_classes):
Xi = self.Nset[class_idx]
d = np.zeros([X.shape[0],1])
B = []
L_basis = []
for i in range(X.shape[0]):
x = np.copy(X[i,:])
ind = indx[i][class_idx]
Ni = np.copy(Xi[ind[:k],:])
u, s, vh = LA.svd(Ni,full_matrices=False)
cum_s = np.cumsum(s)
cum_s = cum_s/np.max(cum_s)
basis = vh[:Ni.shape[0]]
B.append(basis)
L_basis.append((np.where(cum_s>=0.99)[0])[0]+1)
max_basis = min(L_basis)
for i in range(X.shape[0]):
x = np.copy(X[i,:])
basis = B[i][:max_basis,:]
projR = x @ basis.T @ basis # (n_samples, n_features)
d[i]=LA.norm(projR - x)
D.append(np.squeeze(d))
D = np.stack(D, axis=0)
preds = np.argmin(D, axis=0)
pred_label = [self.label[i] for i in preds]
score = (np.sum(pred_label == y))/n
print('Validation accuracy: {} with k = {}'.format(score, k))
if score > score_prev:
count = 0
else:
count = count + 1
if score >= max_acc:
max_acc = score
k_opt = k
acc_count = 0
#count = 0
else:
acc_count = acc_count + 1
if count == 10 or acc_count == 20:
break
score_prev = score
### tune N using validation set
if len(n_range) == 1:
return k_opt, n_range[0]
n_iter = []
max_acc = 0.
score_prev = 0.
n_opt = 1
count = 0
acc_count = 0
for n_enr in n_range:
print('\nN = {}'.format(n_enr))
n_iter.append(n_enr)
D = []
for class_idx in range(self.num_classes):
Xi = self.Nset[class_idx]
d = np.zeros([X.shape[0],1])
B = []
L_basis = []
for i in range(X.shape[0]):
x = np.copy(X[i,:])
ind = indx[i][class_idx]
Ni = self.add_k_enrich(Xi[ind[:k_opt],:], k=n_enr) # k=0 => translation only
u, s, vh = LA.svd(Ni,full_matrices=False)
cum_s = np.cumsum(s)
cum_s = cum_s/np.max(cum_s)
basis = vh[:Ni.shape[0]]
B.append(basis)
L_basis.append((np.where(cum_s>=0.99)[0])[0]+1)
max_basis = min(L_basis)
for i in range(X.shape[0]):
x = X[i,:]
basis = B[i][:max_basis,:]
projR = x @ basis.T @ basis # (n_samples, n_features)
d[i]=LA.norm(projR - x)
D.append(np.squeeze(d))
D = np.stack(D, axis=0)
preds = np.argmin(D, axis=0)
pred_label = [self.label[i] for i in preds]
score = (np.sum(pred_label == y))/n
print('Validation accuracy: {} with k = {}'.format(score, k_opt))
if score > max_acc:
max_acc = score
acc_count = 0
n_opt = n_enr
else:
acc_count = acc_count + 1
if score > score_prev:
count = 0
else:
count = count + 1
if count == 10 or acc_count == 20:
break
score_prev = score
return k_opt, n_opt
def score(self, X, y):
# print('Optimum k: {}'.format(self.k))
# print('Optimum N: {}'.format(self.n_enrV))
n = X.shape[0]
y_pred = self.predict(X)
n_correct = np.sum(y_pred == y)
return n_correct/n, y_pred
def add_k_enrich(self, scdt_features, k):
# scdt_features: (n_samples, scdt)
if k<0:
return scdt_features
v= np.ones([1, scdt_features.shape[1]]) # add translation
indx = 0
for i in range(-k,k+1):
if i != 0:
vi = scdt_features-np.sin(i*np.pi*scdt_features)/(np.abs(i)*np.pi)
v = np.concatenate((v,vi))
indx = indx+1
return np.concatenate((scdt_features,v))
datadir = '../../../PDE_nonlinear_systems/Codes/data/PDE' # '/Path/to/data/'
## dataset options:
# 2 class: 1D_wave_deform_2class_2k
# 3 class: 1D_wave_deform_3class_2k
# 10 class: 1D_wave_deform_10class_2k
dataset = '1D_wave_deform_3class_2k'
print('\n===='+dataset+'====\n')
data_file = os.path.join(datadir, dataset, 'train.hdf5')
with h5py.File(data_file, 'r') as f:
x_train = f['x_train'][()]
y_train = f['y_train'][()]
t_train = f['t_train'][()]
p_train = f['p_train'][()]
num_classes = len(np.unique(y_train))
data_file = os.path.join(datadir, dataset, 'test.hdf5')
with h5py.File(data_file, 'r') as f:
x_test = f['x_test'][()]
y_test = f['y_test'][()]
t_test = f['t_test'][()]
p_test = f['p_test'][()]
print('Number of classes: {}'.format(num_classes))
print('Train set: {}'.format(x_train.shape))
print('Test set: {}'.format(x_test.shape))
====1D_wave_deform_3class_2k==== Number of classes: 3 Train set: (6000, 375) Test set: (600, 375)
# class ranges
if num_classes==2:
class_int = [[0.0, 0.], [0.01, 0.6]]
elif num_classes==3:
class_int = [[0.01, 0.2], [0.21, 0.4], [0.41, 0.6]]
elif num_classes==10:
class_int = [[0.0, 0.06], [0.07, 0.12], [0.13, 0.18], [0.19, 0.24], [0.25, 0.30],
[0.31, 0.36], [0.37, 0.42], [0.43, 0.48], [0.49, 0.54], [0.55, 0.60]]
N = x_train.shape[1]
## define uniform reference signal
t0 = np.linspace(0,1,N) # Domain of the reference
s0 = np.ones(N)
s0 = s0/s0.sum()
scdt = SCDT(reference=s0,x0=t0)
rm_edge = True
cache_file = os.path.join(datadir, dataset, 'scdt_wMass.hdf5') # 'scdt_wMass.hdf5'
# If the SCDTs are already computed, then load from the saved file.
# Otherwise, compute SCDTs for all the samples and save them in the
# same directory as the dataset
if os.path.exists(cache_file):
with h5py.File(cache_file, 'r') as f:
data_train, y_train = f['data_train'][()], f['y_train'][()]
data_test, y_test = f['data_test'][()], f['y_test'][()]
data_train, data_test = data_train.astype(np.float32), data_test.astype(np.float32)
print('loaded from cache file data: x_train {} x_test {}'.format(data_train.shape, data_test.shape))
else:
with h5py.File(cache_file, 'w') as f:
data_train = []
data_test = []
for i in range(x_train.shape[0]):
Ipos, Ineg, Imasspos, Imassneg = scdt.stransform(x_train[i])
if rm_edge:
data_train.append(np.concatenate((Ipos[1:-2],Ineg[1:-2],Imasspos.reshape(1),Imassneg.reshape(1)),axis=0))
else:
data_train.append(np.concatenate((Ipos[:-1],Ineg[:-1],Imasspos.reshape(1),Imassneg.reshape(1)),axis=0))
for i in range(x_test.shape[0]):
Ipos, Ineg, Imasspos, Imassneg = scdt.stransform(x_test[i])
if rm_edge:
data_test.append(np.concatenate((Ipos[1:-2],Ineg[1:-2], Imasspos.reshape(1), Imassneg.reshape(1)),axis=0))
else:
data_test.append(np.concatenate((Ipos[:-1],Ineg[:-1], Imasspos.reshape(1), Imassneg.reshape(1)),axis=0))
data_train = np.stack(data_train)
data_test = np.stack(data_test)
print('Train set: {}'.format(data_train.shape))
print('Test set: {}'.format(data_test.shape))
data_train, data_test = data_train.astype(np.float32), data_test.astype(np.float32)
f.create_dataset('data_train', data=data_train)
f.create_dataset('y_train', data=y_train)
f.create_dataset('data_test', data=data_test)
f.create_dataset('y_test', data=y_test)
print('saved to {}'.format(cache_file))
loaded from cache file data: x_train (6000, 746) x_test (600, 746)
print('Dataset: '+dataset+'\n')
accuracies = []
predictions = []
cls = SCDT_NLS_sysId(num_classes)
print('\nTrain the model...')
cls.fit(data_train, y_train, p_train[:,-1]) # [:,:-2]
print('\nTest the model...')
acc, pred = cls.score(data_test[:,:], y_test[:]) # [:,:-2]
print('\n\n+++++++++++++++++++Test Results+++++++++++++++++++\n')
print('Number of classes: {}\n'.format(num_classes))
print('Detection Accuracy: {:.2f}%'.format(acc*100.))
print('\nConfusion matrix:')
print(confusion_matrix(y_test, pred))
accuracies.append(acc*100.)
predictions.append(pred)
se = []
for i in range(len(pred)):
beta = p_test[i,-1]
se.append((beta - np.mean(class_int[pred[i]]))**2)
MSE_dist = np.mean(se)
print('\nEstimation Error (MSE): {}'.format(MSE_dist))
Dataset: 1D_wave_deform_3class_2k Train the model... Validation accuracy: 0.9483333333333334 with k = 1 Validation accuracy: 0.9522222222222222 with k = 2 Validation accuracy: 0.9555555555555556 with k = 3 Validation accuracy: 0.9577777777777777 with k = 4 Validation accuracy: 0.9566666666666667 with k = 5 Validation accuracy: 0.9566666666666667 with k = 6 Validation accuracy: 0.9566666666666667 with k = 7 Validation accuracy: 0.9583333333333334 with k = 8 Validation accuracy: 0.9583333333333334 with k = 9 Validation accuracy: 0.9572222222222222 with k = 10 Validation accuracy: 0.9544444444444444 with k = 11 Validation accuracy: 0.9505555555555556 with k = 12 Validation accuracy: 0.9527777777777777 with k = 13 Validation accuracy: 0.9516666666666667 with k = 14 Validation accuracy: 0.9527777777777777 with k = 15 Validation accuracy: 0.9511111111111111 with k = 16 Validation accuracy: 0.9488888888888889 with k = 17 Validation accuracy: 0.9472222222222222 with k = 18 Validation accuracy: 0.9483333333333334 with k = 19 Validation accuracy: 0.9444444444444444 with k = 20 Validation accuracy: 0.9444444444444444 with k = 21 Validation accuracy: 0.9438888888888889 with k = 22 Validation accuracy: 0.9438888888888889 with k = 23 Validation accuracy: 0.9422222222222222 with k = 24 Validation accuracy: 0.9427777777777778 with k = 25 Validation accuracy: 0.9422222222222222 with k = 26 Validation accuracy: 0.9416666666666667 with k = 27 Validation accuracy: 0.9411111111111111 with k = 28 Validation accuracy: 0.9416666666666667 with k = 29 N = -1 Validation accuracy: 0.9583333333333334 with k = 9 N = 0 Validation accuracy: 0.9477777777777778 with k = 9 N = 1 Validation accuracy: 0.9466666666666667 with k = 9 Test the model... +++++++++++++++++++Test Results+++++++++++++++++++ Number of classes: 3 Detection Accuracy: 97.17% Confusion matrix: [[197 3 0] [ 6 190 4] [ 0 4 196]] Estimation Error (MSE): 0.0031443231976238926