#!/usr/bin/env python
# coding: utf-8
# # Data-driven Multimodal Alignment
# Author: Ruchit Agrawal
#
# This notebook demonstrates how time-series data pertaining to different modalities can be aligned using deep learning.
# In[1]:
import os
import sys
import librosa
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.spatial.distance import cdist
# In[2]:
plt.style.use('ggplot')
# In[3]:
df = pd.read_csv('M06-1beat_time.csv')
# In[4]:
### Data visualization
df
# In[5]:
df[df.columns[5:]].plot(figsize=(20,12))
plt.title('Variations of beats across different versions of M06-1')
# ## Data generation
# ---
# - `gen_data(folder, csv)`
# - **folder** : path containing files corresponding to a specific Mazurka
# - **csv** : path of the csv file for that Mazurka
#
# - **return** : euclidean distance matrix and aligned path values (extrapolated for each value of )
# - `get_matrix(file1, file2)`
# - **file1** : location of the file to be presented on the x-axis of the distance matrix
# - **file2** : location of the file to be presented on the y-axis of the distance matrix
# - **return** : euclidean distance matrix
# In[6]:
import numpy as np # linear algebra
import random
import pandas as pd # data processing
import matplotlib.pyplot as plt
from pandas import datetime
import math, time
import itertools
from sklearn import preprocessing
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import StandardScaler
import datetime
from operator import itemgetter
from sklearn.metrics import mean_squared_error
from math import sqrt
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
# In[7]:
import warnings
warnings.filterwarnings('ignore')
# In[8]:
import os
import sys
import librosa
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.spatial.distance import cdist
from scipy.interpolate import interp1d
from tqdm import tqdm
def get_matrix(file1, file2):
file1 = file1 + '.mp4'
file2 = file2 + '.mp4'
fname = librosa.util.example_audio_file()
y1, sr1 = librosa.load(file1)
y2, sr2 = librosa.load(file2)
flipped = False
if y1.shape[0] < y2.shape[0]:
flipped=True
y1,y2 = y2, y1
sr1, sr2 = sr2, sr1
factor = y2.shape[0]/y1.shape[0]
# print(factor)
N = 4410 #2048 default value
H = 2205 #512 default value
a1 = librosa.feature.chroma_stft(y = y1, sr = sr1, hop_length = H, n_fft = N)
a2 = librosa.feature.chroma_stft(y = y2, sr = sr2, hop_length = int(H * factor), n_fft = N)
x1 = a1.T
x2 = a2.T
if x1.shape[0] > x2.shape[0]:
euc_dists = cdist(x2, x1, metric='euclidean')
else:
euc_dists = cdist(x1, x2, metric='euclidean')
return euc_dists, factor, flipped
def gen_data(folder,csv):
names = []
for n in os.listdir(folder):
if '.mp4' in n:
names.append(n.replace(".mp4", ""))
df = pd.read_csv(csv)
c = 0
for i in (range(len(df.columns[5:]))):
for j in (range(i+1,len(df.columns[5:]))):
if df.columns[5:][i] and df.columns[5:][j] in names:
x,y = df[df.columns[5:][i]].values, df[df.columns[5:][j]].values
euc_dists, factor, flipped = get_matrix(f'{folder}/{df.columns[5:][i]}', f'{folder}/{df.columns[5:][j]}')
if not flipped:
x = np.round(x * 10, 4)
y = np.round(y * 10 * (1/factor), 4)
else:
x = np.round(x * 10 * (1/factor), 4)
y = np.round(y * 10, 4)
x, y = y, x
f = interp1d(y, x, kind='slinear',fill_value="extrapolate")
xnew = np.arange(0, euc_dists.shape[1],1) #euc_dists.shape[1]
ynew = f(xnew)
yield euc_dists,ynew
# ### **Testing with one iteration over the data loader**
# In[9]:
el = []
pl = []
i = 0
for e,p in tqdm(gen_data(folder='/media/shredpub/Elements/documents/datasets/Mazurkas/mazurka06-1', csv='M06-1beat_time.csv')):
print('i',i, e.shape, p.shape)
el.append(e)
pl.append(p)
i+=1
if i == 10:
break
# ### Plotting sample data
# In[10]:
fig = plt.figure(figsize=(20, 60))
for i in range(10):
ax = fig.add_subplot(5, 2, i+1)
ax.plot(pl[i], 'r--')
ax.imshow(el[i], interpolation='nearest', origin='lower', aspect='auto')
plt.title('Euclidean distance matrix and aligned path')
# ### Positional Encoding
# In[11]:
def get_pe(e):
pe = torch.zeros((e.shape))
columns = e.shape[1]
if e.shape[1]%2 == 1:
columns = e.shape[1]-1
for col in range(0, columns, 2):
pe[:, col] = math.sin(np.pi*(col /(e.shape[0])))/2
pe[:, col + 1] = math.cos(np.pi*(col /(e.shape[0])))/2
pe = pe.unsqueeze(0)
pe = pe[0].detach().numpy()
return pe
# In[12]:
def norm_p(p,e):
for x in range(len(p)):
if p[x] > e.shape[0]:
p[x] = e.shape[0]
return p
# In[13]:
train_dataX = np.array([])
train_dataY = np.array([])
lookback = 100
lookup = 100
no_of_notes = 5 # for all 1000000
counter = 0
for e,p in zip(el,pl):
dataX = []
dataY = []
p = norm_p(p,e)
padded = np.zeros((e.shape[1],e.shape[1]))
padded[:e.shape[0],:e.shape[1]] = e
e = padded
pe = get_pe(e)
e = np.add(e,pe)
scaler1 = MinMaxScaler(feature_range=(-1, 1))
scaler2 = MinMaxScaler(feature_range=(-1, 1))
for i in range(e.shape[1] - lookback):
dataX.append(np.transpose(e[i:i+lookup,i:i+lookback]))
dataY = p[lookback:]
# Normalization
dataX = np.array(dataX)
dataY = np.array(dataY)
dataX = scaler2.fit_transform(dataX.reshape(-1, dataX.shape[-1])).reshape(dataX.shape)
dataY = scaler1.fit_transform(dataY.reshape(-1, 1))
counter+=1
if no_of_notes < counter:
break
if counter > 1:
train_dataX = np.append(train_dataX, dataX, 0)
train_dataY = np.append(train_dataY, dataY, 0)
else:
train_dataX = dataX
train_dataY = dataY
# In[14]:
fig, ax = plt.subplots(figsize=(12,12))
plt.imshow(pe, aspect='auto', origin='lower')
plt.title('Positional encoding for distance matrix (max value:0.5, min value:-0.5)')
# plt.savefig('pe')
# In[15]:
t = []
for x in range(pe.shape[0]):
t.append(pe[x, x])
plt.plot(t)
# In[16]:
fig, ax = plt.subplots(figsize=(12, 12))
plt.imshow(e,aspect='auto', origin='lower')
plt.title('Distance matrix with positional encoding')
# plt.savefig('e+pe')
# ## Data loader
# In[17]:
train_percent = 0.90 # train set percent
validate_percent = 0.05 # valiation set percent
train = int(train_percent * train_dataX.shape[0])
validation = int(validate_percent * train_dataX.shape[0]) + train
train_dataX = torch.from_numpy(train_dataX).type(torch.Tensor)
train_dataY = torch.from_numpy(train_dataY).type(torch.Tensor)
# In[18]:
x_train, x_valid, x_test = train_dataX[0:train], train_dataX[train:validation], train_dataX[validation:]
y_train, y_valid, y_test = train_dataY[0:train], train_dataY[train:validation], train_dataY[validation:]
# In[19]:
x_train.shape, x_valid.shape, x_test.shape,
# ### train-validate-test data loaders
# In[20]:
train = torch.utils.data.TensorDataset(x_train,y_train)
train_loader = torch.utils.data.DataLoader(dataset=train,
batch_size=32,
shuffle=True) #shuffle is important
valid = torch.utils.data.TensorDataset(x_valid,y_valid)
valid_loader = torch.utils.data.DataLoader(dataset=valid,
batch_size=32,
shuffle=False)
test = torch.utils.data.TensorDataset(x_test,y_test)
test_loader = torch.utils.data.DataLoader(dataset=test,
batch_size=32,
shuffle=False)
# ## Pytorch model
# ### Hyper parameters
# In[21]:
input_dim = dataX.shape[-1]
hidden_dim = 128
num_layers = 2
output_dim = 1
lr = 0.0005
epochs = 30
# ### LSTM model
# In[22]:
class LSTM(nn.Module):
def __init__(self, input_dim, hidden_dim, num_layers, output_dim):
super(LSTM, self).__init__()
self.hidden_dim = hidden_dim
self.num_layers = num_layers
self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
# self.fc = nn.Linear(input_dim*hidden_dim, 128) # for all layers into NN
self.fc = nn.Linear(hidden_dim, 64)
self.fc2 = nn.Linear(64, output_dim)
def forward(self, x):
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_()
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).requires_grad_()
out, (hn, cn) = self.lstm(x, (h0.detach(), c0.detach()))
# out = out.reshape(out.shape[0],-1) # for all layers into NN
out = out[:,-1,:] # for last layer into NN
out = F.relu(self.fc(out))
out = self.fc2(out)
return out
model = LSTM(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim, num_layers=num_layers)
loss_fn = torch.nn.MSELoss()
optimiser = torch.optim.Adam(model.parameters(), lr=lr)
# In[23]:
total_loss = []
for x in tqdm(range(epochs)):
i = 0
batch_loss = []
for X,Y in (train_loader):
optimiser.zero_grad()
xin = X
yin = Y
# Forward pass
y_train_pred = model(xin)
loss = loss_fn(y_train_pred, yin)
# Backward pass
loss.backward()
# Update parameters
optimiser.step()
i+=1
batch_loss.append(float(loss.detach().numpy()))
if x%2==0:
pred = []
real = []
with torch.no_grad():
for X,Y in (valid_loader):
pre = model(X)
pred.extend(pre[:,0])
real.extend(Y[:,0])
pred = np.array(pred)
real = np.array(real)
Score = math.sqrt(mean_squared_error(pred, real))
batch_loss = np.array(batch_loss)
total_loss.append(np.mean(batch_loss))
print("Training loss: ", np.mean(batch_loss), "MSE validation:", Score)
#
# In[24]:
stats = []
reals = []
predicteds = []
train_loader = torch.utils.data.DataLoader(dataset=train,
batch_size=32,
shuffle=False)
with torch.no_grad():
count = 0
for X,Y in tqdm(train_loader):
count+=1
X_in = X
# forward
pre = model(X_in)
predict = pre
predicteds.extend(predict[:,0])
reals.extend(Y[:,0])
stats.extend(Y[:,0]-predict[:,0])
# In[25]:
fig, ax = plt.subplots(figsize=(20,6))
plt.plot(reals, label='real')
plt.plot(predicteds, label='pre')
plt.legend()
# ### Testing
# In[26]:
data = []
lookback = 100
lookup = 100
no_of_notes = 5 # for all 1000000
counter = 0
for e,p in zip(el,pl):
dataX = []
dataY = []
if no_of_notes > counter:
counter+=1
continue
p = norm_p(p,e)
padded = np.zeros((e.shape[1], e.shape[1]))
padded[:e.shape[0], :e.shape[1]] = e
e = padded
pe = get_pe(e)
e = np.add(e, pe)
for i in range(e.shape[1] - lookback):
dataX.append(np.transpose(e[i:i+lookup, i:i+lookback]))
dataY.extend(p[lookback:])
dataX = np.array(dataX)
dataY = np.array(dataY)
data.append([dataX, dataY])
data = np.array(data)
# In[27]:
data.shape
# In[28]:
fig, ax = plt.subplots(figsize=(12,12))
print(e.shape)
plt.plot(p)
plt.imshow(e,aspect='auto', origin='lower')
# In[29]:
def moving_average(x, w):
return np.convolve(x, np.ones(w), 'valid') / w
z = 1
threshold = 3e-8
for dataX, dataY in data:
scaler = MinMaxScaler(feature_range=(-1, 1))
scaler2 = MinMaxScaler(feature_range=(-1, 1))
Y = scaler.fit_transform(dataY.reshape(-1, 1))
dataX = scaler2.fit_transform(dataX.reshape(-1, dataX.shape[-1])).reshape(dataX.shape)
X_train = torch.from_numpy(dataX).type(torch.Tensor)
Y_train = torch.from_numpy(Y).type(torch.Tensor)
train = torch.utils.data.TensorDataset(X_train, Y_train)
train_next_loader = torch.utils.data.DataLoader(dataset=train,
batch_size=32,
shuffle=False) #shuffle is important
stats = []
std = []
reals = []
predicteds = []
with torch.no_grad():
count = 0
i = 0
for X,Y in tqdm(train_next_loader):
count+=1
X_in = X
# forward
temp = []
for _ in range(10):
pre = model(X_in)
pre = pre.reshape(-1)
temp.append(pre.numpy())
temp = np.array(temp)
std.extend(np.std(temp,0))
for s_error_i in range(X.shape[0]):
if np.std(temp,0)[s_error_i] < threshold:
predicteds.append([s_error_i + i, np.mean(temp,0)[s_error_i]])
reals.extend(Y[:,0])
i+=X.shape[0]
predicteds = np.array(predicteds)
fig, ax = plt.subplots(figsize=(12,12))
plt.plot(reals,linewidth=3, label='real')
plt.scatter(predicteds[:,0], predicteds[:,1], s=10, c='green', alpha=0.5, label='pre',)
plt.plot(predicteds[:-4,0], moving_average(predicteds[:,1], 5), linewidth=2, label='moving avg pre')
plt.title(f'Ground truth v. predicted alignment with moving average for test pair #{z}')
plt.ylabel('Scaled location in the score')
plt.xlabel('Performance audio')
plt.legend()
z+=1
#
# In[30]:
# We used very limited data for training here. Training on more data will give better results.
# In[31]:
# Stay tuned for further optimizations!
# In[ ]: