- Published on
(Implicit) Matrix FactorizationをCythonで高速化する
- Authors
- Name
- Minato Sato
- @minatosatou
こんにちは、さとうみなとです。CythonでMatrix Factorizationの実装を行ったときのメモです。
はじめに
ユーザーがアイテムに対して何らかの評価を与えた行列を考えたときに、★1~5のように陽に評価値が与えられたものをExplicit Feedback、そうでないもので購入やクリックの有無(positive feedbackとnegative feedbackの2値)が与えられたものをImplicit Feedbackと言います。negative feedbackについてはユーザーがアイテムに対してnegativeな反応があったというだけではなく、単純にそのアイテムを認知していないものも含むため注意が必要です。今回の実装はこうしたImplicit Feedbackの状況を想定したMatrix Factorizationについて扱っていきたいと思います。
Weighted Matrix Factorization
概要
Implicit Feedbackのデータに対するMatrix Factorizationの最もスタンダードな方法としてWeighted Matrix Factorization (WMF)があります。 WMFはnegative feedbackについて重みを小さくするという非常にシンプルなヒューリスティクスな方法を考えます。 具体的な損失関数は下記のような重み付き2乗誤差となっています。
ここで、confidence は となるように設定します。
その他のnotationは以前の記事と同じです。
更新式
SGDで学習する際の更新式は下記の通り。(微分して勾配を求めてください)
データセット
今回はMovieLens 100K Datasetのうち、評価が★4以上のものをpositive feedback、それ以外のもの(i.e. ★3以下と評価なし)をnegative feedbackとして扱います。実装時には以前も紹介したlightfm
の力を借りて
from lightfm.datasets import fetch_movielens
data = fetch_movielens(min_rating=4.0)
Y_train = data["train"].toarray()
Y_train[Y_train > 0] = 1.0
のようにしてnumpyの形式で使います。
評価
今回はsklearn
の力を借りて(ROCの)AUCで評価します。(ちゃんと学習できているかの確認程度)
NumPyによる実装
mini-batchとか使わずに1つ1つのサンプルを使って学習させると下記のような感じ。
from itertools import product
import lightfm
import numpy as np
from lightfm.datasets import fetch_movielens
from sklearn import metrics
from sklearn.utils import shuffle
data = fetch_movielens(min_rating=4.0)
# confidence c。
c_1 = 10.
c_0 = 1.
# 学習回数
num_epochs = 3
# 潜在ベクトルの次元
K = 20
# U ユーザー数
# I アイテム数
U, I = data["train"].shape
# SGDの学習率
learning_rate = 1e-2
# 正則化係数
lam = 1e-3
Y_train = data["train"].toarray()
Y_train[Y_train > 0] = 1.0
Y_test = data["test"].toarray()
Y_test[Y_test > 0] = 1.0
users, items = zip(*product(range(U), range(I)))
users = np.array(users)
items = np.array(items)
ratings = np.array([Y_train[_u, _i] for _u, _i in zip(users, items)])
# 潜在ベクトル初期化
theta = np.random.uniform(low=-0.1, high=0.1, size=(U, K)) / K
beta = np.random.uniform(low=-0.1, high=0.1, size=(I, K)) / K
for epoch in range(num_epochs):
users, items, ratings = shuffle(users, items, ratings)
loss = []
for i in range(U*I):
predicted = theta[users[i]] @ beta[items[i]]
e = ratings[i] - predicted
c = c_1 if ratings[i] == 1. else c_0
theta_tmp = theta[users[i]].copy()
beta_tmp = beta[items[i]].copy()
theta[users[i]] += learning_rate * (beta_tmp * e * c - lam * theta_tmp)
beta[items[i]] += learning_rate * (theta_tmp * e * c - lam * beta_tmp )
loss.append((e*c)**2 + lam*(theta_tmp**2).sum() + lam*(beta_tmp**2).sum())
print(np.mean(loss))
predicted = theta.dot(beta.T)
scores = np.zeros(U)
for u in range(U):
if len(set(Y_test[u])) != 1:
fpr, tpr, thresholds = metrics.roc_curve(Y_test[u], predicted[u])
scores[u] = metrics.auc(fpr, tpr)
else:
scores[u] = 0.0
print(f"test mean auc: {scores.mean()}")
実行環境にもよりますが、自分の環境では下記の通り2分以上かかりました。
In [1]: %time run mf
epoch 1: loss=0.2557801751819014
epoch 2: loss=0.13357901588200438
epoch 3: loss=0.1294046242539832
test mean auc: 0.8768254588999452
CPU times: user 2min 23s, sys: 630 ms, total: 2min 24s
Wall time: 2min 23s
Cythonによる実装
基本的にループ処理とPyObject同士の演算について気をつければかなりの高速化が図れます。具体的には、全ての変数の型を宣言する、numpy.ndarray
で保持していたデータは全てmemoryview(e.g. double[:,:])
として保持する等です。numpy.ndarray
として保持しないので、NumPyで実装した際には
predicted = theta[users[i]] @ beta[items[i]]
としていた部分などは、@ (numpy.dot)
が使えないので
cdef int k
cdef double predicted
...
predicted = 0.0
for k in range(K):
predicted += theta[users[i], k] * beta[items[i], k]
のように書く必要があります。
実装の全体は下記の通りで、仮にmf_cy.pyx
という名前で保存します。
# cython: language_level=3
# distutils: language=c++
from itertools import product
import cython
import numpy as np
cimport numpy as np
from sklearn.utils import shuffle
cdef inline double square(double x) nogil:
return x * x
class WeightedMatrixFactorization(object):
def __init__(self, int num_components = 20, double learning_rate = 0.01, double weight = 10., double weight_decay = 0.001):
self.num_components = num_components
self.weight = weight
self.weight_decay = weight_decay
self.learning_rate = learning_rate
def fit(self, np.ndarray Y, int num_epochs = 3):
cdef double[:,:] Y_train = Y.astype(np.float64)
cdef int U = Y.shape[0]
cdef int I = Y.shape[1]
cdef int K = self.num_components
# 潜在ベクトル初期化
self.theta = np.random.uniform(low=-0.1, high=0.1, size=(U, K)).astype(np.float64) / K
self.beta = np.random.uniform(low=-0.1, high=0.1, size=(I, K)).astype(np.float64) / K
self.__fit(Y_train, num_epochs)
@cython.boundscheck(False)
@cython.wraparound(False)
def __fit(self, double[:,:] Y, int num_epochs):
cdef double[:,:] theta = self.theta
cdef double[:,:] beta = self.beta
cdef double lam = self.weight_decay
cdef double lr = self.learning_rate
cdef double num_components = self.num_components
cdef int U = Y.shape[0]
cdef int I = Y.shape[1]
cdef int K = self.num_components
cdef double c, c_1, c_0
cdef int[:] users, items
cdef double[:] ratings
cdef int epoch, i, k
cdef double predicted
cdef double[:] theta_tmp = np.zeros(K)
cdef double[:] beta_tmp = np.zeros(K)
cdef double e
cdef double[:] loss = np.zeros(U*I)
c_1 = self.weight
c_0 = 1.0
_users, _items = zip(*product(range(U), range(I)))
_users = np.array(_users).astype(np.int32)
_items = np.array(_items).astype(np.int32)
_ratings = np.array([Y[_u, _i] for _u, _i in zip(_users, _items)])
users, items, ratings = shuffle(_users, _items, _ratings)
for epoch in range(num_epochs):
for i in range(U*I):
loss[i] = 0.0
predicted = 0.0
for k in range(K):
predicted += theta[users[i], k] * beta[items[i], k]
e = ratings[i] - predicted
c = c_1 if ratings[i] == 1. else c_0
for k in range(K):
theta_tmp[k] = theta[users[i], k]
beta_tmp[k] = beta[items[i], k]
for k in range(K):
theta[users[i], k] += lr * (beta_tmp[k] * e * c - lam * theta_tmp[k])
beta[items[i], k] += lr * (theta_tmp[k] * e * c - lam * beta_tmp[k] )
loss[i] += lam*(square(theta_tmp[k]) + square(beta_tmp[k]))
loss[i] += square(e)*c
print(f"epoch {epoch+1}: loss={np.mean(loss)}")
setup.py
は下記の通りで、最終行でCythonで実装したファイル(.pyx
)を指定します。
import os
import numpy as np
from Cython.Build import cythonize
from distutils.core import setup
cmpl_args = ['-O3']
lnk_args = []
os.environ['CFLAGS'] = " ".join(cmpl_args + lnk_args)
os.environ['CXXFLAGS'] = " ".join(cmpl_args + lnk_args)
setup(ext_modules=cythonize(["mf_cy.pyx"]), include_dirs= [np.get_include()])
コンパイルは
python setup.py build_ext --inplace
で実行でき、成功すると同じディレクトリにmf_cy.cpython-38-darwin.so
のような名前のファイルができているはずです。これを指定するとpython側でimportできるようになります。具体例は下記の通りで、
from itertools import product
import numpy as np
from lightfm.datasets import fetch_movielens
from sklearn import metrics
data = fetch_movielens(min_rating=4.0)
U, I = data["train"].shape
Y_train = data["train"].toarray()
Y_train[Y_train > 0] = 1.0
Y_test = data["test"].toarray()
Y_test[Y_test > 0] = 1.0
from mf_cy import WeightedMatrixFactorization
model = WeightedMatrixFactorization(
num_components=20, learning_rate=0.01, weight=10., weight_decay=1e-3)
model.fit(Y_train)
predicted = model.theta.dot(model.beta.T)
scores = np.zeros(U)
for u in range(U):
if len(set(Y_test[u])) != 1:
fpr, tpr, thresholds = metrics.roc_curve(Y_test[u], predicted[u])
scores[u] = metrics.auc(fpr, tpr)
else:
scores[u] = 0.0
print(f"test mean auc: {scores.mean()}")
実際に実行した結果は
In [1]: %time run mf_cy_ml100k.py
epoch 1: loss=0.2586439797444402
epoch 2: loss=0.13372843259177666
epoch 3: loss=0.1282534501027116
test mean auc: 0.8765697084700025
CPU times: user 3.77 s, sys: 241 ms, total: 4.01 s
Wall time: 3.43 s
となり、2分20秒程度かかっていたのがわずか3.4秒になっています!
さいごに
今回は学習のloop処理の中についてnumpy.ndarray
をmemoryview
に置き換えることによって高速化を行いました。実はHogwild(lock-freeな非同期なSGD)を使えば(GloVe公式実装等はこれです)更に高速化することができますが、紹介はまた今度の機会にしたいと思います。 数式やコード、説明に間違いございましたらTwitterかコメント欄までお願い致します。