Please refer to the article at http://www.albertauyeung.com/post/python-matrix-factorization/ for the detailed explanation.
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
class MF():
def __init__(self, R, K, alpha, beta, iterations):
"""
Perform matrix factorization to predict empty
entries in a matrix.
Arguments
- R (ndarray) : user-item rating matrix
- K (int) : number of latent dimensions
- alpha (float) : learning rate
- beta (float) : regularization parameter
"""
self.R = R
self.num_users, self.num_items = R.shape
self.K = K
self.alpha = alpha
self.beta = beta
self.iterations = iterations
def train(self):
# Initialize user and item latent feature matrice
self.P = np.random.normal(scale=1./self.K, size=(self.num_users, self.K))
self.Q = np.random.normal(scale=1./self.K, size=(self.num_items, self.K))
# Initialize the biases
self.b_u = np.zeros(self.num_users)
self.b_i = np.zeros(self.num_items)
self.b = np.mean(self.R[np.where(self.R != 0)])
# Create a list of training samples
self.samples = [
(i, j, self.R[i, j])
for i in range(self.num_users)
for j in range(self.num_items)
if self.R[i, j] > 0
]
# Perform stochastic gradient descent for number of iterations
training_process = []
for i in range(self.iterations):
np.random.shuffle(self.samples)
self.sgd()
mse = self.mse()
training_process.append((i, mse))
if (i+1) % 10 == 0:
print("Iteration: %d ; error = %.4f" % (i+1, mse))
return training_process
def mse(self):
"""
A function to compute the total mean square error
"""
xs, ys = self.R.nonzero()
predicted = self.full_matrix()
error = 0
for x, y in zip(xs, ys):
error += pow(self.R[x, y] - predicted[x, y], 2)
return np.sqrt(error)
def sgd(self):
"""
Perform stochastic graident descent
"""
for i, j, r in self.samples:
# Computer prediction and error
prediction = self.get_rating(i, j)
e = (r - prediction)
# Update biases
self.b_u[i] += self.alpha * (e - self.beta * self.b_u[i])
self.b_i[j] += self.alpha * (e - self.beta * self.b_i[j])
# Create copy of row of P since we need to update it but use older values for update on Q
P_i = self.P[i, :][:]
# Update user and item latent feature matrices
self.P[i, :] += self.alpha * (e * self.Q[j, :] - self.beta * self.P[i,:])
self.Q[j, :] += self.alpha * (e * P_i - self.beta * self.Q[j,:])
def get_rating(self, i, j):
"""
Get the predicted rating of user i and item j
"""
prediction = self.b + self.b_u[i] + self.b_i[j] + self.P[i, :].dot(self.Q[j, :].T)
return prediction
def full_matrix(self):
"""
Computer the full matrix using the resultant biases, P and Q
"""
return mf.b + mf.b_u[:,np.newaxis] + mf.b_i[np.newaxis:,] + mf.P.dot(mf.Q.T)
R = np.array([
[5, 3, 0, 1],
[4, 0, 0, 1],
[1, 1, 0, 5],
[1, 0, 0, 4],
[0, 1, 5, 4],
])
mf = MF(R, K=2, alpha=0.1, beta=0.01, iterations=20)
training_process = mf.train()
print()
print("P x Q:")
print(mf.full_matrix())
print()
print("Global bias:")
print(mf.b)
print()
print("User bias:")
print(mf.b_u)
print()
print("Item bias:")
print(mf.b_i)
Iteration: 2 ; error = 4.8575 Iteration: 12 ; error = 0.1299 P x Q: [[ 4.98421651 2.99708195 3.12289161 1.00971487] [ 3.99564826 2.1268789 3.50941346 1.01124379] [ 1.01613916 0.98386613 4.5843299 4.9908544 ] [ 0.99513504 0.75011143 3.73365012 3.98862199] [ 1.61407605 1.02983448 4.98695019 3.99475012]] Global bias: 2.76923076923 User bias: [ 0.1810972 -0.0346873 0.11877163 -0.44968314 0.29954531] Item bias: [ 0.34310914 -0.81457993 0.851025 -0.22669804]
x = [x for x, y in training_process]
y = [y for x, y in training_process]
plt.figure(figsize=((16,4)))
plt.plot(x, y)
plt.xticks(x, x)
plt.xlabel("Iterations")
plt.ylabel("Mean Square Error")
plt.grid(axis="y")