from orangecontrib.recommendation.ranking import Learner, Model
from orangecontrib.recommendation.utils.format_data import *
from orangecontrib.recommendation.optimizers import *
from orangecontrib.recommendation.utils.datacaching import cache_rows
from collections import defaultdict
from scipy.special import expit as sigmoid
import numpy as np
import time
import warnings
__all__ = ['CLiMFLearner']
__sparse_format__ = lil_matrix
def _g(x):
"""sigmoid function"""
return sigmoid(x)
def _dg(x):
ex = np.exp(-x)
y = ex / (1 + ex) ** 2
return y
def _matrix_factorization(ratings, shape, num_factors, num_iter, learning_rate,
lmbda, optimizer, verbose=False, random_state=None,
callback=None):
# Seed the generator
if random_state is not None:
np.random.seed(random_state)
# Get featured matrices dimensions
num_users, num_items = shape
# Initialize low-rank matrices
U = 0.01 * np.random.rand(num_users, num_factors) # User-feature matrix
V = 0.01 * np.random.rand(num_items, num_factors) # Item-feature matrix
# Configure optimizer
update_ui = create_opt(optimizer, learning_rate).update
update_vw = create_opt(optimizer, learning_rate).update
# Cache rows
users_cached = defaultdict(list)
# Print information about the verbosity level
if verbose:
print('CLiMF factorization started.')
print('\tLevel of verbosity: ' + str(int(verbose)))
print('\t\t- Verbosity = 1\t->\t[time/iter]')
print('\t\t- Verbosity = 2\t->\t[time/iter, loss]')
print('\t\t- Verbosity = 3\t->\t[time/iter, loss, MRR]')
print('')
# Prepare sample of users
if verbose > 2:
queries = None
num_samples = min(num_users, 1000) # max. number to sample
users_sampled = np.random.choice(np.arange(num_users), num_samples)
# Catch warnings
with warnings.catch_warnings():
# Turn matching warnings into exceptions
warnings.filterwarnings('error')
try:
# Factorize matrix using SGD
for step in range(num_iter):
if verbose:
start = time.time()
print('- Step: %d' % (step + 1))
# Send information about the process
if callback:
callback(step + 1)
# Optimize rating prediction
for i in range(len(U)):
dU = -lmbda * U[i]
# Precompute f (f[j] = <U[i], V[j]>)
items = cache_rows(ratings, i, users_cached)
f = np.einsum('j,ij->i', U[i], V[items])
for j in range(len(items)): # j=items
w = items[j]
dV = _g(-f[j]) - lmbda * V[w]
# For I
vec1 = _dg(f[j] - f) * \
(1 / (1 - _g(f - f[j])) - 1 / (1 - _g(f[j] - f)))
dV += np.einsum('i,j->ij', vec1, U[i]).sum(axis=0)
update_vw(-dV, V, w)
dU += _g(-f[j]) * V[w]
# For II
vec2 = (V[items[j]] - V[items])
vec3 = _dg(f - f[j]) / (1 - _g(f - f[j]))
dU += np.einsum('ij,i->ij', vec2, vec3).sum(axis=0)
update_ui(-dU, U, i)
# Print process
if verbose:
print('\t- Time: %.3fs' % (time.time() - start))
if verbose > 1:
# Set parameters and compute loss
low_rank_matrices = (U, V)
params = lmbda
objective = compute_loss(ratings, low_rank_matrices, params)
print('\t- Training loss: %.3f' % objective)
if verbose > 2:
model = CLiMFModel(U=U, V=V)
mrr, queries = \
model.compute_mrr(ratings, users_sampled, queries)
print('\t- Train MRR: %.4f' % mrr)
print('')
except RuntimeWarning:
callback(num_iter) if callback else None
raise RuntimeError('Training diverged and returned NaN.')
return U, V
def compute_loss(data, low_rank_matrices, params):
# Set parameters
ratings = data
U, V = low_rank_matrices
lmbda = params
# Check data type
if isinstance(ratings, __sparse_format__):
pass
elif isinstance(ratings, Table):
# Preprocess Orange.data.Table and transform it to sparse
ratings, order, shape = preprocess(ratings)
ratings = table2sparse(ratings, shape, order, m_type=__sparse_format__)
else:
raise TypeError('Invalid data type')
# Cache rows
users_cached = defaultdict(list)
F = -0.5*lmbda*(np.sum(U*U)+np.sum(V*V))
for i in range(len(U)):
# Precompute f (f[j] = <U[i], V[j]>)
items = cache_rows(ratings, i, users_cached)
f = np.einsum('j,ij->i', U[i], V[items])
for j in range(len(items)): # j=items
F += np.log(_g(f[j]))
F += np.log(1 - _g(f - f[j])).sum(axis=0) # For I
return F
[docs]class CLiMFLearner(Learner):
"""CLiMF: Collaborative Less-is-More Filtering Matrix Factorization
This model uses stochastic gradient descent to find two low-rank
matrices: user-feature matrix and item-feature matrix.
CLiMF is a matrix factorization for scenarios with binary relevance data
when only a few (k) items are recommended to individual users. It improves top-k
recommendations through ranking by directly maximizing the Mean Reciprocal
Rank (MRR).
Attributes:
num_factors: int, optional
The number of latent factors.
num_iter: int, optional
The number of passes over the training data (aka epochs).
learning_rate: float, optional
The learning rate controlling the size of update steps (general).
lmbda: float, optional
Controls the importance of the regularization term (general).
Avoids overfitting by penalizing the magnitudes of the parameters.
optimizer: Optimizer, optional
Set the optimizer for SGD. If None (default), classical SGD will be
applied.
verbose: boolean or int, optional
Prints information about the process according to the verbosity
level. Values: False (verbose=0), True (verbose=1) and INTEGER
random_state: int, optional
Set the seed for the numpy random generator, so it makes the random
numbers predictable. This a debbuging feature.
callback: callable
"""
name = 'CLiMF'
def __init__(self, num_factors=5, num_iter=25, learning_rate=0.0001,
lmbda=0.001, preprocessors=None, optimizer=None, verbose=False,
random_state=None, callback=None):
self.num_factors = num_factors
self.num_iter = num_iter
self.learning_rate = learning_rate
self.lmbda = lmbda
self.optimizer = SGD() if optimizer is None else optimizer
self.random_state = random_state
self.callback = callback
super().__init__(preprocessors=preprocessors, verbose=verbose)
[docs] def fit_storage(self, data):
"""Fit the model according to the given training data.
Args:
data: Orange.data.Table
Returns:
self: object
Returns self.
"""
# Prepare data
data = super().prepare_fit(data)
# Check convergence
if self.learning_rate == 0:
warnings.warn("With learning_rate=0, this algorithm does not "
"converge well.", stacklevel=2)
# Transform ratings matrix into a sparse matrix
data = table2sparse(data, self.shape, self.order,
m_type=__sparse_format__)
# Factorize matrix
U, V = _matrix_factorization(ratings=data, shape=self.shape,
num_factors=self.num_factors,
num_iter=self.num_iter,
learning_rate=self.learning_rate,
lmbda=self.lmbda, optimizer=self.optimizer,
verbose=self.verbose,
random_state=self.random_state,
callback=self.callback)
# Construct model
model = CLiMFModel(U=U, V=V)
return super().prepare_model(model)
class CLiMFModel(Model):
def __init__(self, U, V):
self.U = U
self.V = V
super().__init__()
def predict(self, X, top_k=None):
"""Perform predictions on samples in X for all items.
Args:
X: array, optional
Array with the indices of the users to which make the
predictions. If None (default), predicts for all users.
top_k: int, optional
Returns the k-first predictions. (Do not confuse with
'top-best').
Returns:
C: ndarray, shape = (n_samples, n_items)
Returns predicted values. A matrix (U, I) with the indices of
the items recommended, sorted by ascending ranking. (1st better
than 2nd, than 3rd,...)
"""
# Check if is an array
if isinstance(X, np.ndarray):
if X.ndim != 1:
X = X[:, self.order[0]]
elif not isinstance(X, int): # Check if is not an int
raise TypeError("'Invalid input data. Array or 'int'")
# Compute scores
predictions = np.dot(self.U[X], self.V.T)
# Return indices of the sorted predictions
# (Return top-k recommendations, optional)
if isinstance(X, int): # Array of 1D
predictions = np.argsort(predictions)
predictions = predictions[::-1]
predictions = predictions[:top_k] if top_k else predictions
else:
predictions = np.argsort(predictions, axis=1)
predictions = np.fliplr(predictions)
predictions = predictions[:, :top_k] if top_k else predictions
return predictions
def getUTable(self):
variable = self.original_domain.variables[self.order[0]]
return feature_matrix(variable, self.U)
def getVTable(self):
variable = self.original_domain.variables[self.order[1]]
return feature_matrix(variable, self.V)