Spark ML cookbook (Python)

Spark ML cookbook (Python)

Calculate percentage of sparsity of a user-item rating DataFrame

result = ratingDF.agg(F.count('rating'), F.countDistinct('user'), F.countDistinct('item')).collect()[0]
totalUserCount = result['count(DISTINCT user)']
totalItemCount = result['count(DISTINCT item)']
zonZeroRatingCount = result['count(rating)']

density = (zonZeroRatingCount / (totalUserCount * totalItemCount)) * 100
sparsity = 100 - density

Recommend items for single user

topN = 30
userID = 123

userItemsDF = alsModel \
    .itemFactors. \
    selectExpr('{0} AS user'.format(userID), 'id AS item')
userPredictedDF = alsModel \
    .transform(userItemsDF) \
    .select('item', 'prediction') \
    .orderBy('prediction', ascending=False) \
    .limit(topN)

ref:
https://www.safaribooksonline.com/library/view/advanced-analytics-with/9781491972946/ch03.html
https://www.safaribooksonline.com/library/view/building-a-recommendation/9781785282584/ch06s04.html
https://www.safaribooksonline.com/library/view/building-recommendation-engines/9781785884856/ch07s05.html

Recommend items for every user (a slightly fast way)

import numpy
from numpy import *

myModel = MatrixFactorizationModel.load(sc, 'BingBong')
m1 = myModel.productFeatures()
m2 = m1.map(lambda (product, feature): feature).collect()
m3 = matrix(m2).transpose()
pf = sc.broadcast(m3)
uf = myModel.userFeatures().coalesce(100)

# get predictions on all user
f1 = uf.map(lambda (userID, features): (userID, squeeze(asarray(matrix(array(features)) * pf.value))))

ref:
https://www.slideshare.net/SparkSummit/26-trillion-app-recomendations-using-100-lines-of-spark-code-ayman-farahat

Evaluate a binary classification

from pyspark.ml.evaluation import BinaryClassificationEvaluator

matrix = [
    (0.5, 1),
    (2.0, 1),
    (0.8, 1),
    (0.2, 0),
    (0.1, 0),
    (0.4, 0),
]
predictions = spark.createDataFrame(matrix, ['prediction', 'label'])
predictions = predictions.withColumn('prediction', predictions['prediction'].cast('double'))

evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
evaluator.evaluate(predictions)

Calculate ranking metrics

from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window
from pyspark.sql.functions import col, expr
import pyspark.sql.functions as F

k = 10

windowSpec = Window.partitionBy('user').orderBy(col('prediction').desc())
perUserPredictedItemsDF = outputDF \
    .select('user', 'item', 'prediction', F.rank().over(windowSpec).alias('rank')) \
    .where('rank <= {0}'.format(k)) \
    .groupBy('user') \
    .agg(expr('collect_list(item) as items'))
perUserPredictedItemsDF.show()
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[36560369, 197450...|
# |   47217|[40693501, 643554...|
# +--------+--------------------+

windowSpec = Window.partitionBy('from_user_id').orderBy(col('starred_at').desc())
perUserActualItemsDF = rawDF \
    .select('from_user_id', 'repo_id', 'starred_at', F.rank().over(windowSpec).alias('rank')) \
    .where('rank <= {0}'.format(k)) \
    .groupBy('from_user_id') \
    .agg(expr('collect_list(repo_id) as items')) \
    .withColumnRenamed('from_user_id', 'user')
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[29122050, 634846...|
# |   59990|[9820191, 8729416...|
# +--------+--------------------+

perUserItemsRDD = perUserPredictedItemsDF.join(perUserActualItemsDF, 'user') \
    .rdd \
    .map(lambda row: (row[1], row[2]))
rankingMetrics = RankingMetrics(perUserItemsRDD)

print(rankingMetrics.meanAveragePrecision)
print(rankingMetrics.precisionAt(k))
print(rankingMetrics.ndcgAt(k))

ref:
https://www.safaribooksonline.com/library/view/spark-the-definitive/9781491912201/ch19.html

Create a custom Transformer

from pyspark.ml import Transformer

class PredictionProcessor(Transformer):

    def _transform(self, predictedDF):
        nonNullDF = predictedDF.dropna(subset=['prediction', ])
        predictionDF = nonNullDF.withColumn('prediction', nonNullDF['prediction'].cast('double'))
        return predictionDF

ref:
http://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml

RankingMetrics

結果介於 0 ~ 1 之間,分數越大越好。

from pyspark.mllib.evaluation import RankingMetrics

predictionAndLabels = sc.parallelize([
    ([1, 2, 3, 4], [1, 2, 3, 4]),
    ([1, ], [1, 10]),
    ([6, 4, 2], [6, 2, 100, 8, 2, 55]),
])
metrics = RankingMetrics(predictionAndLabels)
metrics.ndcgAt(5)

ref:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems

Create a custom Evaluator

from pyspark import keyword_only
from pyspark.ml.evaluation import Evaluator
from pyspark.ml.param.shared import Param
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
import pyspark.sql.functions as F

class RankingEvaluator(Evaluator):

    @keyword_only
    def __init__(self, k=None):
        super(RankingEvaluator, self).__init__()
        self.k = Param(self, 'k', 'Top K')
        self._setDefault(k=30)
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, k=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def isLargerBetter(self):
        return True

    def setK(self, value):
        self._paramMap[self.k] = value
        return self

    def getK(self):
        return self.getOrDefault(self.k)

    def _evaluate(self, outputDF):
        k = self.getK()

        windowSpec = Window.partitionBy('user').orderBy(col('prediction').desc())
        perUserPredictedItemsDF = outputDF \
            .select('user', 'item', 'prediction', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        windowSpec = Window.partitionBy('user').orderBy(col('starred_at').desc())
        perUserActualItemsDF = outputDF \
            .select('user', 'item', 'starred_at', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        perUserItemsRDD = perUserPredictedItemsDF.join(F.broadcast(perUserActualItemsDF), 'user', 'inner') \
            .rdd \
            .map(lambda row: (row[1], row[2]))
        rankingMetrics = RankingMetrics(perUserItemsRDD)
        metric = rankingMetrics.ndcgAt(k)
        return metric

Show best parameters from a CrossValidatorModel

metric_params_pairs = list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))
metric_params_pairs.sort(key=lambda x: x[0], reverse=True)
best_metric_params = metric_params_pairs[0][1]
for pair in metric_params_pairs:
    metric, params = pair
    print('metric', metric)
    for k, v in params.items():
        print(k.name, v)
    print('')
# metric 0.5273636632856705
# rank 50
# regParam 0.01
# maxIter 10
# alpha 1

ref:
https://stackoverflow.com/questions/31749593/how-to-extract-best-parameters-from-a-crossvalidatormodel
https://stackoverflow.com/questions/39529012/pyspark-get-all-parameters-of-models-created-with-paramgridbuilder

Show best parameters from a TrainValidationSplitModel

metric_params_pairs = list(zip(tvModel.validationMetrics, tvModel.getEstimatorParamMaps()))
metric_params_pairs.sort(key=lambda x: x[0], reverse=True)
for pair in metric_params_pairs:
    metric, params = pair
    print('metric', metric)
    for k, v in params.items():
        print(k.name, v)
    print('')
# metric 0.5385481418189484
# rank 50
# regParam 0.1
# maxIter 20
# alpha 1
Recommender System: Matrix Factorization

Recommender System: Matrix Factorization

Singular Value Decomposition (SVD) 奇異值分解

        item_a  item_b  item_c
user_1  2       -       -
user_2  -       1       -
user_3  3       3       -
user_4  -       2       2

對 m x n 的評分矩陣做矩陣分解,矩陣分解常用的方法之一是 SVD 奇異值分解,把評分矩陣分解成三個矩陣 U S V*。

SVD 在推荐系统中的应用
http://yanyiwu.com/work/2012/09/10/SVD-application-in-recsys.html

奇异值分解 (SVD) 原理与在降维中的应用
http://www.cnblogs.com/pinard/p/6251584.html

Alternating Least Squares (ALS) 交替最小二乘法

因為 SVD 要求矩陣是稠密的,推薦系統中的評分矩陣通常是一個很大的稀疏矩陣,所以實務上都會使用 SVD 的變種:Funk-SVD,Funk-SVD 也稱為 Latent Factor Model (LFM) 隱語意模型:把 m x n 的評分矩陣 R 分解成 U I 兩個矩陣,分別是 m x k 的 users 矩陣和 k x n 的 items 矩陣,各自都有 k 維的特徵,計算時只考慮評分不為 0 的項目。

ALS 就是求出 U I 矩陣的一種求解方法。其他的方式還有 Stochastic Gradient Descent (SGD)。

ALS 在 Spark MLlib 中的实现
http://www.csdn.net/article/2015-05-07/2824641

矩阵分解在协同过滤推荐算法中的应用
http://www.cnblogs.com/pinard/p/6351319.html

基于矩阵分解的隐因子模型
http://www.voidcn.com/blog/winone361/article/p-5031282.html

ALS for implicit feedbacks

這種模型也稱為 Weighted Regularized Matrix Factorization (WRMF)。

        item_a  item_b  item_c
user_1  1       -       -
user_2  -       1       -
user_3  1       1       -
user_4  -       1       1

符號說明:

  • Rui = User u's interaction on Item i
  • Xu = User u's vector
  • Yi = Item i's vector
  • Pui = 1 if Rui > 0 else 0
  • Cui = 1 + alpha x Rui

參數說明:

  • iteration 迭代次數
  • rank 隱含特徵數
  • lambda 正則化參數
  • alpha 置信度權重

跟 Explicit ALS 一樣,把 m x n 的隱式反饋矩陣 R 分解成 X Y 兩個矩陣,分別是 m x k 的 user latent factors 矩陣和 k x n 的 item latent factors 矩陣。不同的是,還額外引入了兩個矩陣:m x n 的 P 矩陣,binary preference,Pui 表示用戶是不是對該物品感興趣;m x n 的 C 矩陣,confidence 置信度(或者想成是 strength 強度),Cui 表示有多確定用戶對該物品感興趣。整個演算法的目標仍舊是計算出 users 和 items 矩陣,跟 Explicit ALS 的差別在於 loss function 多加入了 Cui 和 Pui,而且會考慮所有的 user-item pair,包含 missing value / non-observed value(Rui = 0)。

只要用戶對物品有過行為(Rui > 0),我們就假設用戶對該物品有偏好(Pui = 1),用戶對該物品的行為越多(Rui 越大),Cui 就會越大,表示置信度越高,假設越可信;如果用戶對該物品沒有行為(Rui = 0),就假設用戶對該物品沒有偏好(Pui = 0),但是因為 Cui = 1 + alpha x 0,所以這個假設相對來說置信度較低,比較不可信。

Explicit ALS 是要預測用戶對某個物品的評分;Implicit ALS 則是要預測用戶會不會對某個物品感興趣:Pui = Xu x Yi.T,也就是說 Implicit ALS 輸出的 prediction 其實是用 X x Y.T 所重建出來的 P 矩陣。這些 prediction 的值基本上會落在 [0, 1] 之間,但是也可能大於 1 或是小於 0,值越大表示用戶對該物品有越強烈的偏好,越小則反之。

因為 ALS 屬於 collaborative filtering,所以也有 cold start 的問題,無法對新用戶和新物品做出推薦。除此之外,ALS 的缺點是缺乏解釋性而且難以加入 side information(user 和 item 本身的 features,例如性別、曲風、類型之類的)。不過因為 matrix factorization 其實隱含了 clustering 的功能,相對於其他方式來說,具有抗雜訊的能力。

Collaborative Filtering for Implicit Feedback Datasets
http://yifanhu.net/PUB/cf.pdf

A Gentle Introduction to Recommender Systems with Implicit Feedback
https://jessesw.com/Rec-System/

Intro to Implicit Matrix Factorization: Classic ALS
http://blog.ethanrosenthal.com/2016/10/19/implicit-mf-part-1/

Spark ML 算法原理剖析
https://github.com/endymecy/spark-ml-source-analysis

Recommender System: Collaborative Filtering 協同過濾推薦演算法

Recommender System: Collaborative Filtering 協同過濾推薦演算法

dataset 會是 m 個用戶對 n 個物品的評分 utility matrix
因為通常只有部分用戶和部份物品會有評分資料
所以是一個 sparse matrix(稀疏矩陣)
目標是利用這些稀疏的資料去預測出用戶對他還沒評分過的物品的評分
除了評分之外,也可能是喜歡(和不喜歡)、購買、瀏覽之類的數據
又分成主動評分和被動評分

CF 的缺點:

  • 如果沒有用戶的歷史數據就沒辦法做任何推薦
  • 以及無論 user-based 或 item-based 都需要消耗大量的運算資源
  • 大部分用戶有評分紀錄的資料都只佔所有資料中的很小一部分,matrix 相當稀疏,很難找到相似的資料
  • 會有馬太效應,越熱門的物品越容易被推薦,所以通常都會降低熱門物品的權重

CF 主要分為 memory-based 和 model-based 兩大類
user-based 和 item-based collaborative filtering 屬於 memory-based
memory-based 基本上就是純粹的計算,沒有什麼 Machine Learning 的成分
model-based 才是 Machine Learning 的範疇

User-based Collaborative Filtering

        item_a  item_b  item_c
user_1  2       -       3
user_2  5       2       -
user_3  3       3       1
user_4  -       2       2
# the algorithm from "Mahout in Action"
for every other user w
  compute a similarity s between u and w
  retain the top users, ranked by similarity, as a neighborhood n

for every item i that some user in n has a preference for,
      but that u  has no preference for yet
  for every other user v in n that has a preference for i
    compute a similarity s between u and  v
    incorporate v's preference for i, weighted by s, into a running average

user-based 考慮的是 user 和 user 之間的相似程度

給定一個用戶 A
計算用戶 A 跟其他所有用戶的相似度
找出最相似的 m 個用戶
再找出這些用戶有評分但是用戶 A 沒有評分的物品(也可以額外限制至少要幾個用戶有評分過)
以「相似用戶的相似度」和「該用戶對該物品的評分」來加權算出用戶 A 對這些未評分物品的評分
最後推薦給 A 評分最高的 n 個物品

預測 user_4 對 item_a 的評分 =
(user_4_user_1_sim x user_1_item_a_rating + user_4_user_3_sim x user_3_item_a_rating) / (user_4_user_1_sim + user_4_user_3_sim)

user-based 的特點:

  • 適合 user 遠少於 item 的系統,相似度的計算量會較少
  • item 的時效性強、更多樣的系統,例如新聞、社交網站,適合用 user-based CF
  • 不容易給出推薦理由
  • 驚喜度較高

常用的相似度演算法:

  • Pearson Correlation Coefficient
  • Cosine Similarity
  • Adjusted Cosine Similarity(有些用戶傾向於對所有物品評高分或低分,這個計算方式可以消除這樣的影響)

ref:
https://www.safaribooksonline.com/library/view/mahout-in-action/9781935182689/kindle_split_013.html

Item-based Collaborative Filtering

        user_1  user_2  user_3  user_4
item_a  2       5       3       -
item_b  -       2       3       2
item_c  3       -       1       2
# the algorithm from "Mahout in Action"
for every item i that u has no preference for yet
  for every item j that u has a preference for
    compute a similarity s between i and j
    add u's preference for j, weighted by s, to a running average
return the top items, ranked by weighted average

item-based 考慮的是 item 和 item 之間的相似程度
item-based 用的還是跟 user-based CF 一模一樣的資料
而不是使用 item 本身的特徵(那個叫 content-based)

如果物品數比用戶數還少得多的話
可以事先計算好所有物品之間的相似度
給定一個用戶 A
找出用戶 A 的所有未評分物品
以「用戶 A 的已評分物品對該未評分物品的相似度」和「用戶 A 對已評分物品的評分」來加權算出用戶 A 對這些未評分物品的評分
最後推薦給用戶 A 評分最高的 n 個物品

預測 user_4 對 item_a 的評分 =
(item_b_item_a_sim x user_4_item_b_rating + item_c_item_a_sim x user_4_item_c_rating) / (item_b_item_a_sim + item_c_item_a_sim)

也可以無視用戶 A 的歷史評分資料(或是根本沒有用戶 A 的歷史資料)
直接推薦跟某個物品最相似的 n 個物品

item-based 的特點:

  • 適合 item 遠少於 user 的系統,相似度的計算量會較少
  • 購物、電影、音樂、書籍等系統,用戶的興趣相對固定,適合用 item-based CF
  • 只會推薦類似的東西,驚喜度和多樣性較低
  • 通常只有在用戶量比較小的時候才需要頻繁地重新計算物品之間的相似度,隨著用戶量越大,物品的相似度會趨於穩定

ref:
https://ashokharnal.wordpress.com/2014/12/18/worked-out-example-item-based-collaborative-filtering-for-recommenmder-engine/
http://blog.csdn.net/huagong_adu/article/details/7362908

Slope One Recommender

        item_a  item_b  item_c
user_1  5       3       2
user_2  3       4       -
user_3  -       2       5
# the algorithm from "Mahout in Action"
for every item i the user u expresses no preference for
  for every item j that user u expresses a preference for
    find the average preference difference between j and i
    add this diff to u's preference value for j
    add this to a running average
return the top items, ranked by these averages

因為 memory-based collaborative filtering 的其中一個問題是數據量很大時計算量也會很可觀
所有就有人提出 Slope One 這種簡單粗暴的演算法來
雖然 Slope One 還是得計算所有物品兩兩之間的平均差異

Slope One 假設任兩個物品之間的評分都是一個 y = mx + b 而且 m = 1(斜率為 1)的線性關係
item_a 平均比 item_b 多 (2 + (-1)) / 2 = 0.5
item_a 平均比 item_c 多 (5 - 2) / 1 = 3
如果用 user_3 對 item_b 的評分來預測他對 item_a 的評分會是 2 + 0.5 = 2.5
如果用 user_3 對 item_c 的評分來預測他對 item_a 的評分會是 5 + 3 = 8
通常會用有多少人同時評分來加權多個評分

預測 user_3 對 item_a 的評分 =
((同時對 item_a 和 item_b 評分的人數 x user_3 用 item_b 對 item_a 的預測評分) + (同時對 item_a 和 item_c 評分的人數 x user_3 用 item_c 對 item_a 的預測評分)) / (同時對 item_a 和 item_b 評分的人數 + 同時對 item_a 和 item_c 評分的人數)
((2 x 2.5) + (1 x 8)) / (2 + 1) = 4.33

ref:
https://en.wikipedia.org/wiki/Slope_One