Build a recommender system with Spark: Implicit ALS

Build a recommender system with Spark: Implicit ALS

在這個系列的文章裡,我們將使用 Apache Spark、XGBoost、Elasticsearch 和 MySQL 等工具來搭建一個推薦系統的 Machine Learning Pipeline。推薦系統的組成可以粗略地分成 Candidate Generation 和 Ranking 兩個部分,前者是針對用戶產生候選物品集,常用的方法有 Collaborative Filtering、Content-based、標籤配對、熱門排行或人工精選等;後者則是對這些候選物品排序,以 Top N 的方式呈現最終的推薦結果,常用的方法有 Logistic Regression。

在本篇文章中,我們將以 Candidate Generation 階段常用的方法之一:Collaborative Filtering 協同過濾演算法為例,利用 Apache Spark 的 ALS (Alternating Least Squares) 模型建立一個 GitHub repositories 的推薦系統,以用戶對 repo 的打星紀錄作為訓練數據,推薦出用戶可能會感興趣的其他 repo 作為候選物品集。

完整的程式碼可以在 https://github.com/vinta/albedo 找到。

系列文章:

Submit the Application

因為需要使用 JDBC 讀取 MySQL 資料庫,必須安裝 MySQL driver,可以透過 --packages "mysql:mysql-connector-java:5.1.41" 參數在 cluster 的每一台機器上安裝需要的 Java packages。

$ spark-submit \
--packages "com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41" \
--master spark://YOUR_SPARK_MASTER:7077 \
--py-files deps.zip \
train_als.py -u vinta

ref:
https://spark.apache.org/docs/latest/submitting-applications.html

Load Data

讀取來自 MySQL 資料庫的數據。你可以使用 predicates 參數來指定 WHERE 條件,雖然嚴格來說這個參數是用來控制 partition 數量的,一個條件就是一個 partition。

假設 app_repostarring 的欄位如下:

CREATE TABLE `app_repostarring` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `from_user_id` int(11) NOT NULL,
  `from_username` varchar(39) NOT NULL,
  `repo_owner_id` int(11) NOT NULL,
  `repo_owner_username` varchar(39) NOT NULL,
  `repo_owner_type` varchar(16) NOT NULL,
  `repo_id` int(11) NOT NULL,
  `repo_name` varchar(100) NOT NULL,
  `repo_full_name` varchar(140) NOT NULL,
  `repo_description` varchar(191) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `repo_language` varchar(32) NOT NULL,
  `repo_created_at` datetime(6) NOT NULL,
  `repo_updated_at` datetime(6) NOT NULL,
  `starred_at` datetime(6) NOT NULL,
  `stargazers_count` int(11) NOT NULL,
  `forks_count` int(11) NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `from_user_id_repo_id` (`full_name`, `repo_id`)
);
def loadRawData():
    url = 'jdbc:mysql://127.0.0.1:3306/albedo?user=root&password=123&verifyServerCertificate=false&useSSL=false'
    properties = {'driver': 'com.mysql.jdbc.Driver'}
    rawDF = spark.read.jdbc(url, table='app_repostarring', properties=properties)
    return rawDF

rawDF = loadRawData()

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=jdbc#pyspark.sql.DataFrameReader.jdbc
http://www.gatorsmile.io/numpartitionsinjdbc/

Preprocess Data

Format Data

把 raw data 整理成 user,item,rating,starred_at 這樣的格式。starred_at 只有評價 model 時有用來排序,訓練 model 時並沒有用到,因為 Spark 的 ALS 沒辦法輕易地整合 side information。

from pyspark.ml import Transformer

class RatingBuilder(Transformer):

    def _transform(self, rawDF):
        ratingDF = rawDF \
            .selectExpr('from_user_id AS user', 'repo_id AS item', '1 AS rating', 'starred_at') \
            .orderBy('user', F.col('starred_at').desc())
        return ratingDF

ratingBuilder = RatingBuilder()
ratingDF = ratingBuilder.transform(rawDF)
ratingDF.cache()

ref:
http://blog.ethanrosenthal.com/2016/11/07/implicit-mf-part-2/

Inspect Data

import pyspark.sql.functions as F

ratingDF.rdd.getNumPartitions()
# 200

ratingDF.agg(F.count('rating'), F.countDistinct('user'), F.countDistinct('item')).show()
# +-------------+--------------------+--------------------+
# |count(rating)|count(DISTINCT user)|count(DISTINCT item)|
# +-------------+--------------------+--------------------+
# |      3121629|               10483|              551216|
# +-------------+--------------------+--------------------+

stargazersCountDF = ratingDF \
    .groupBy('item') \
    .agg(F.count('user').alias('stargazers_count')) \
    .orderBy('stargazers_count', ascending=False)
stargazersCountDF.show(10)
# +--------+----------------+
# |    item|stargazers_count|
# +--------+----------------+
# | 2126244|            2211|
# |10270250|            1683|
# |  943149|            1605|
# |  291137|            1567|
# |13491895|            1526|
# | 9384267|            1480|
# | 3544424|            1468|
# | 7691631|            1441|
# |29028775|            1427|
# | 1334369|            1399|
# +--------+----------------+

starredCountDF = ratingDF \
    .groupBy('user') \
    .agg(F.count('item').alias('starred_count')) \
    .orderBy('starred_count', ascending=False)
starredCountDF.show(10)
# +-------+-------------+
# |   user|starred_count|
# +-------+-------------+
# |3947125|         8947|
# |5527642|         7978|
# | 446613|         7860|
# | 627410|         7800|
# |  13998|         6334|
# |2467194|         6327|
# |  63402|         6034|
# |2005841|         6024|
# |5073946|         5980|
# |   2296|         5862|
# +-------+-------------+

Clean Data

你可以過濾掉那些太少 user 打星的 item 和打星了太少 item 的 user,提昇矩陣的稠密度。這個現象也正好是 Cold Start 的問題,你就是沒有足夠多的關於這些 item 和 user 的數據(可以考慮使用 content-based 的推薦方式)。除此之外,如果你的推薦系統所推薦的 item 只有非常少人打星,即便你完美地挖掘了長尾效應,這樣的推薦結果給用戶的「第一印象」可能也不會太好(這可能決定了他要不要繼續使用這個系統或是他要不要真的去嘗試那個你推薦給他的東西)。

你也可以選擇要不要過濾掉那些超多人打星的 item 和打星了超多 item 的 user。如果某些 item 有超過八、九成的 user 都打星了,對於這麼熱門的 item,可能也沒有推薦的必要了,因為其他 user 早晚也會自己發現的;如果有少數的 user 幾乎打星了一半以上的 item,這些 user 可能是屬於某種 web crawler 的用途或是這些 user 就是那種看到什麼就打星什麼的人,無論是哪一種,他們可能都不是你想要 modeling 的對象,可以考慮從 dataset 中拿掉。

實務上,如果你有關於 user 或 item 的黑名單,例如一些 SPAM 帳號或 NSFW 的內容等,也可以在這個步驟把它們過濾掉。

from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import Param
import pyspark.sql.functions as F

class DataCleaner(Transformer):

    @keyword_only
    def __init__(self, minItemStargazersCount=None, maxItemStargazersCount=None, minUserStarredCount=None, maxUserStarredCount=None):
        super(DataCleaner, self).__init__()
        self.minItemStargazersCount = Param(self, 'minItemStargazersCount', '移除 stargazer 數低於這個數字的 item')
        self.maxItemStargazersCount = Param(self, 'maxItemStargazersCount', '移除 stargazer 數超過這個數字的 item')
        self.minUserStarredCount = Param(self, 'minUserStarredCount', '移除 starred repo 數低於這個數字的 user')
        self.maxUserStarredCount = Param(self, 'maxUserStarredCount', '移除 starred repo 數超過這個數字的 user')
        self._setDefault(minItemStargazersCount=1, maxItemStargazersCount=50000, minUserStarredCount=1, maxUserStarredCount=50000)
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, minItemStargazersCount=None, maxItemStargazersCount=None, minUserStarredCount=None, maxUserStarredCount=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def setMinItemStargazersCount(self, value):
        self._paramMap[self.minItemStargazersCount] = value
        return self

    def getMinItemStargazersCount(self):
        return self.getOrDefault(self.minItemStargazersCount)

    def setMaxItemStargazersCount(self, value):
        self._paramMap[self.maxItemStargazersCount] = value
        return self

    def getMaxItemStargazersCount(self):
        return self.getOrDefault(self.maxItemStargazersCount)

    def setMinUserStarredCount(self, value):
        self._paramMap[self.minUserStarredCount] = value
        return self

    def getMinUserStarredCount(self):
        return self.getOrDefault(self.minUserStarredCount)

    def setMaxUserStarredCount(self, value):
        self._paramMap[self.maxUserStarredCount] = value
        return self

    def getMaxUserStarredCount(self):
        return self.getOrDefault(self.maxUserStarredCount)

    def _transform(self, ratingDF):
        minItemStargazersCount = self.getMinItemStargazersCount()
        maxItemStargazersCount = self.getMaxItemStargazersCount()
        minUserStarredCount = self.getMinUserStarredCount()
        maxUserStarredCount = self.getMaxUserStarredCount()

        toKeepItemsDF = ratingDF \
            .groupBy('item') \
            .agg(F.count('user').alias('stargazers_count')) \
            .where('stargazers_count >= {0} AND stargazers_count <= {1}'.format(minItemStargazersCount, maxItemStargazersCount)) \
            .orderBy('stargazers_count', ascending=False) \
            .select('item', 'stargazers_count')
        temp1DF = ratingDF.join(toKeepItemsDF, 'item', 'inner')

        toKeepUsersDF = temp1DF \
            .groupBy('user') \
            .agg(F.count('item').alias('starred_count')) \
            .where('starred_count >= {0} AND starred_count <= {1}'.format(minUserStarredCount, maxUserStarredCount)) \
            .orderBy('starred_count', ascending=False) \
            .select('user', 'starred_count')
        temp2DF = temp1DF.join(toKeepUsersDF, 'user', 'inner')

        cleanDF = temp2DF.select('user', 'item', 'rating', 'starred_at')
        return cleanDF

dataCleaner = DataCleaner(
    minItemStargazersCount=2,
    maxItemStargazersCount=4000,
    minUserStarredCount=2,
    maxUserStarredCount=5000
)
cleanDF = dataCleaner.transform(ratingDF)

cleanDF.agg(F.count('rating'), F.countDistinct('user'), F.countDistinct('item')).show()
# +-------------+--------------------+--------------------+
# |count(rating)|count(DISTINCT user)|count(DISTINCT item)|
# +-------------+--------------------+--------------------+
# |      2761118|               10472|              245626|
# +-------------+--------------------+--------------------+

Generate Negative Samples

對 implicit feedback 的 ALS 來說,手動加入負樣本(Rui = 0 的樣本)是沒有意義的,因為 missing value / non-observed value 對該演算法來說本來就是 0,表示用戶確實沒有對該物品做出行為,也就是 Pui = 0 沒有偏好,所以 Cui = 1 + alpha x 0 置信度也會比其他正樣本低。不過因為 Spark ML 的 ALS 只會計算 Rui > 0 的項目,所以即便你手動加入了 Rui = 0 或 Rui = -1 的負樣本,對整個模型其實沒有影響。

雖然沒有負樣本你就不能算 area under ROC curve 或是 area under Precision-Recall curve 等 binary classifier 用的指標,不過你可以改用 Learning to rank 的評估方式,例如 NDCG 或 Mean Average Precision 等。但是 ALS 的 loss function 也沒辦法直接優化 NDCG 這樣的指標就是了。

ref:
https://vinta.ws/code/generate-negative-samples-for-recommender-system.html

Split Data

因為 Matrix Factorization 需要考慮每個 user-item pair,如果你餵給 model 它沒見過的資料,它就沒辦法進行推薦(冷啟動問題)。只要 user 或 item 其中之一不存在於 dataset 裡,ALS model 所輸出的 prediction 值就會是 NaN。所以應該盡量保持每個 user 和 item 都出現在 training set 和 testing set 裡,例如隨機挑出每個 user 的任意 n 個或 n 比例的評分作為 test set,剩下的評分當作 training set(俗稱 leave-n-out)。如果使用 Machine Learning 中常見的 holdout 方式,隨機地把所有 data point 分散到 training set 和 test set(例如 df.randomSplit([0.7, 0.3])),會有很高的機率造成部分 user 或 item 只出現在其中一組 dataset 裡。

ref:
https://jessesw.com/Rec-System/
http://blog.ethanrosenthal.com/2016/10/19/implicit-mf-part-1/

從 LibRec 的文件上也可以發現還有許多拆分數據的方式,例如:

  • 基于 Ratio 的分类方法为通过给定的比例来将数据分为两部分。这个分类过程可以在所有数据中进行随机分类,也可以在用户或者物品维度上进行分类。当有时间的特征时,可以根据时间顺序留出最后一定比例的数据来进行测试。
  • LooCV 的分割方法为 leave-one-user/item/rating-out,也就是随机选取每个 user 的任意一个 item 或者每个 item 的任意一个 user 作为测试数据,余下的数据来作为训练数据。在实现中实现了基于 User 和基于 Item 的多种分类方式。
  • GivenN 分割方法是指为每个用户留出指定数目 N 的数据来作为测试用例,余下的样本作为训练数据。
  • KCV 即 K 折交叉验证。将数据分割为 K 份,在每次执行时选择其中一份作为测试数据,余下的数据作为训练数据,共执行 K 次。综合 K 次的训练结果来对推荐算法的性能进行评估。

ref:
https://www.librec.net/dokuwiki/doku.php?id=DataModel_zh#splitter

這裡我們用 sampleBy() 簡單地寫了一個根據 user 來隨機劃分 item 到 training set 和 test set 的方法。

def randomSplitByUser(df, weights, seed=None):
    trainingRation = weights[0]
    fractions = {row['user']: trainingRation for row in df.select('user').distinct().collect()}
    training = df.sampleBy('user', fractions, seed)
    testRDD = df.rdd.subtract(training.rdd)
    test = spark.createDataFrame(testRDD, df.schema)
    return training, test

training, test = randomSplitByUser(ratingDF, weights=[0.7, 0.3])

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.sampleBy

Train the Model

from pyspark.ml.recommendation import ALS

als = ALS(implicitPrefs=True, seed=42) \
    .setRank(50) \
    .setMaxIter(22) \
    .setRegParam(0.5) \
    .setAlpha(40)

alsModel = als.fit(training)

# 這些就是訓練出來的 user 和 item 的 Latent Factors
alsModel.userFactors.show()
alsModel.itemFactors.show()

ref:
https://spark.apache.org/docs/latest/ml-collaborative-filtering.html
https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS

Predict Preferences

from pyspark.ml import Transformer

predictedDF = alsModel.transform(testing)

class PredictionProcessor(Transformer):

    def _transform(self, predictedDF):
        nonNullDF = predictedDF.dropna(subset=['prediction', ])
        predictionDF = nonNullDF.withColumn('prediction', nonNullDF['prediction'].cast('double'))
        return predictionDF

# 刪掉那些 NaN 的數據
predictionProcessor = PredictionProcessor()
predictionDF = predictionProcessor.transform(predictedDF)

Evaluate the Model

因為 Spark ML 沒有提供給 DataFrame 用的 ranking evaluator,我們只好自己寫一個,但是內部還是使用 Spark MLlib 的 RankingMetrics。不過這個只是 offline 的評估方式而已,等到要實際上線的時候可能還需要做 A/B testing。

from pyspark import keyword_only
from pyspark.ml.evaluation import Evaluator
from pyspark.ml.param.shared import Param
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
import pyspark.sql.functions as F

class RankingEvaluator(Evaluator):

    @keyword_only
    def __init__(self, k=None):
        super(RankingEvaluator, self).__init__()
        self.k = Param(self, 'k', 'Top K')
        self._setDefault(k=30)
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, k=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def isLargerBetter(self):
        return True

    def setK(self, value):
        self._paramMap[self.k] = value
        return self

    def getK(self):
        return self.getOrDefault(self.k)

    def _evaluate(self, predictedDF):
        k = self.getK()

        predictedDF.show()

        windowSpec = Window.partitionBy('user').orderBy(col('prediction').desc())
        perUserPredictedItemsDF = predictedDF \
            .select('user', 'item', 'prediction', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        windowSpec = Window.partitionBy('user').orderBy(col('starred_at').desc())
        perUserActualItemsDF = predictedDF \
            .select('user', 'item', 'starred_at', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        perUserItemsRDD = perUserPredictedItemsDF.join(F.broadcast(perUserActualItemsDF), 'user', 'inner') \
            .rdd \
            .map(lambda row: (row[1], row[2]))

        if perUserItemsRDD.isEmpty():
            return 0.0

        rankingMetrics = RankingMetrics(perUserItemsRDD)
        metric = rankingMetrics.ndcgAt(k)
        return metric

k = 30
rankingEvaluator = RankingEvaluator(k=k)
ndcg = rankingEvaluator.evaluate(predictionDF)
print('NDCG', ndcg)

ref:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html

Recommend Items

實際感受一下推薦系統的效果如何。這裡是直接把結果 print 出來,而沒有把推薦結果儲存到資料庫。不過通常不會直接就把推薦系統輸出的東西展示給用戶,會先經過一些過濾、排序和產生推薦理由等等的步驟,或是加入一些人為的規則,比如說強制插入廣告、最近主打的商品或是過濾掉那些很多人點擊但是其實質量並不怎麼樣的東西。當然也有可能會把這個推薦系統的輸出作為其他機器學習 model 的輸入。

ref:
https://www.zhihu.com/question/28247353

def recommendItems(rawDF, alsModel, username, topN=30, excludeKnownItems=False):
    userID = rawDF \
        .where('from_username = "{0}"'.format(username)) \
        .select('from_user_id') \
        .take(1)[0]['from_user_id']

    userItemsDF = alsModel \
        .itemFactors. \
        selectExpr('{0} AS user'.format(userID), 'id AS item')
    if excludeKnownItems:
        userKnownItemsDF = rawDF \
            .where('from_user_id = {0}'.format(userID)) \
            .selectExpr('repo_id AS item')
        userItemsDF = userItemsDF.join(userKnownItemsDF, 'item', 'left_anti')

    userPredictedDF = alsModel \
        .transform(userItemsDF) \
        .select('item', 'prediction') \
        .orderBy('prediction', ascending=False) \
        .limit(topN)

    repoDF = rawDF \
        .groupBy('repo_id', 'repo_full_name', 'repo_language') \
        .agg(F.max('stargazers_count').alias('stargazers_count'))

    recommendedItemsDF = userPredictedDF \
        .join(repoDF, userPredictedDF['item'] == repoDF['repo_id'], 'inner') \
        .select('prediction', 'repo_full_name', 'repo_language', 'stargazers_count') \
        .orderBy('prediction', ascending=False)

    return recommendedItemsDF

k = 30
username = 'vinta'
recommendedItemsDF = recommendItems(rawDF, alsModel, username, topN=k, excludeKnownItems=False)
for item in recommendedItemsDF.collect():
    repoName = item['repo_full_name']
    repoUrl = 'https://github.com/{0}'.format(repoName)
    print(repoUrl, item['prediction'], item['repo_language'], item['stargazers_count'])

ref:
https://github.com/vinta/albedo/blob/master/src/main/python/train_als.ipynb

Cross-validate Models

使用 Spark ML 的 pipeline 來做 cross-validation,選出最適合的 hyperparameters 組合。

  • rank: The number of latent factors in the model, or equivalently, the number of columns k in the user-feature and product-feature matrices.
  • regParam: A standard overfitting parameter, also usually called lambda. Higher values resist overfitting, but values that are too high hurt the factorization’s accuracy.
  • alpha: Controls the relative weight of observed versus unobserved user-product interactions in the factorization.
  • maxIter: The number of iterations that the factorization runs. More iterations take more time but may produce a better factorization.
dataCleaner = DataCleaner()

als = ALS(implicitPrefs=True, seed=42)

predictionProcessor = PredictionProcessor()

pipeline = Pipeline(stages=[
    dataCleaner,
    als,
    predictionProcessor,
])

paramGrid = ParamGridBuilder() \
    .addGrid(dataCleaner.minItemStargazersCount, [1, 10, 100]) \
    .addGrid(dataCleaner.maxItemStargazersCount, [4000, ]) \
    .addGrid(dataCleaner.minUserStarredCount, [1, 10, 100]) \
    .addGrid(dataCleaner.maxUserStarredCount, [1000, 4000, ]) \
    .addGrid(als.rank, [50, 100]) \
    .addGrid(als.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(als.alpha, [0.01, 0.89, 1, 40, ]) \
    .addGrid(als.maxIter, [22, ]) \
    .build()

rankingEvaluator = RankingEvaluator(k=30)

cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=rankingEvaluator,
                    numFolds=2)

cvModel = cv.fit(ratingDF)

def printCrossValidationParameters(cvModel):
    metric_params_pairs = list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))
    metric_params_pairs.sort(key=lambda x: x[0], reverse=True)
    for pair in metric_params_pairs:
        metric, params = pair
        print('metric', metric)
        for k, v in params.items():
            print(k.name, v)
        print('')

printCrossValidationParameters(cvModel)

ref:
https://spark.apache.org/docs/latest/ml-pipeline.html

Generate negative samples for recommender system?

Generate negative samples for recommender system?

根據「推荐系统实践」,挑選負樣本時應該遵循以下原則:

  • 对每个用户,要保证正负样本的平衡(数目相似)。
  • 对每个用户采样负样本时,要选取那些很热门,而用户却没有行为的物品。
  • 一般认为,很热门而用户却没有行为更加代表用户对这个物品不感兴趣。因为对于冷门的物品,用户可能是压根没在网站中发现这个物品,所以谈不上是否感兴趣。

ref:
http://www.duokan.com/reader/www/app.html?id=ed873c9e323511e28a9300163e0123ac

不過如果你是用 Spark ML 的 ALS(implicitPrefs=True) 的話,並不需要手動加入負樣本。對 implicit feedback 的 ALS 來說,手動加入負樣本(Rui = 0 的樣本)是沒有意義的,因為 missing value / non-observed value 對該演算法來說本來就是 0,表示用戶確實沒有對該物品做出行為,也就是 Pui = 0 沒有偏好,所以 Cui = 1 + alpha x 0 置信度也會比其他正樣本低。不過因為 Spark ML 的 ALS 只會計算 Rui > 0 的項目,所以即便你手動加入了 Rui = 0 或 Rui = -1 的負樣本,對整個模型其實沒有影響。

用以下這三組 dataset 訓練出來的模型都是一樣的:

from pyspark.ml.recommendation import ALS

matrix = [
    (1, 1, 0),
    (1, 2, 1),
    (1, 3, 0),
    (1, 4, 1),
    (1, 5, 1),
    (2, 1, 1),
    (2, 2, 1),
    (2, 3, 0),
    (2, 4, 1),
    (2, 5, 1),
    (3, 1, 1),
    (3, 2, 1),
    (3, 3, 1),
    (3, 4, 1),
    (3, 5, 0),
]
df0 = spark.createDataFrame(matrix, ['user', 'item', 'rating'])

matrix = [
    (1, 1, -1),
    (1, 2, 1),
    (1, 3, -1),
    (1, 4, 1),
    (1, 5, 1),
    (2, 1, 1),
    (2, 2, 1),
    (2, 3, -1),
    (2, 4, 1),
    (2, 5, 1),
    (3, 1, 1),
    (3, 2, 1),
    (3, 3, 1),
    (3, 4, 1),
    (3, 5, -1),
]
df1 = spark.createDataFrame(matrix, ['user', 'item', 'rating'])

matrix = [
    (1, 2, 1),
    (1, 4, 1),
    (1, 5, 1),
    (2, 1, 1),
    (2, 2, 1),
    (2, 4, 1),
    (2, 5, 1),
    (3, 1, 1),
    (3, 2, 1),
    (3, 3, 1),
    (3, 4, 1),
]
df2 = spark.createDataFrame(matrix, ['user', 'item', 'rating'])

als = ALS(implicitPrefs=True, seed=42, nonnegative=False).setRank(7).setMaxIter(15).setRegParam(0.01).setAlpha(40)
alsModel = als.fit(df0)
alsModel.userFactors.select('features').show(truncate=False)
alsModel.itemFactors.select('features').show(truncate=False)

als = ALS(implicitPrefs=True, seed=42, nonnegative=False).setRank(7).setMaxIter(15).setRegParam(0.01).setAlpha(40)
alsModel = als.fit(df1)
alsModel.userFactors.select('features').show(truncate=False)
alsModel.itemFactors.select('features').show(truncate=False)

als = ALS(implicitPrefs=True, seed=42, nonnegative=False).setRank(7).setMaxIter(15).setRegParam(0.01).setAlpha(40)
alsModel = als.fit(df2)
alsModel.userFactors.select('features').show(truncate=False)
alsModel.itemFactors.select('features').show(truncate=False)

ref:
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L1626
https://github.com/apache/spark/commit/b05b3fd4bacff1d8b1edf4c710e7965abd2017a7
https://www.mail-archive.com/[email protected]/msg60240.html
http://apache-spark-user-list.1001560.n3.nabble.com/implicit-ALS-dataSet-td7067.html

Spark ML cookbook (Python)

Spark ML cookbook (Python)

Calculate percentage of sparsity of a user-item rating DataFrame

result = ratingDF.agg(F.count('rating'), F.countDistinct('user'), F.countDistinct('item')).collect()[0]
totalUserCount = result['count(DISTINCT user)']
totalItemCount = result['count(DISTINCT item)']
zonZeroRatingCount = result['count(rating)']

density = (zonZeroRatingCount / (totalUserCount * totalItemCount)) * 100
sparsity = 100 - density

Recommend items for single user

topN = 30
userID = 123

userItemsDF = alsModel \
    .itemFactors. \
    selectExpr('{0} AS user'.format(userID), 'id AS item')
userPredictedDF = alsModel \
    .transform(userItemsDF) \
    .select('item', 'prediction') \
    .orderBy('prediction', ascending=False) \
    .limit(topN)

ref:
https://www.safaribooksonline.com/library/view/advanced-analytics-with/9781491972946/ch03.html
https://www.safaribooksonline.com/library/view/building-a-recommendation/9781785282584/ch06s04.html
https://www.safaribooksonline.com/library/view/building-recommendation-engines/9781785884856/ch07s05.html

Recommend items for every user (a slightly fast way)

import numpy
from numpy import *

myModel = MatrixFactorizationModel.load(sc, 'BingBong')
m1 = myModel.productFeatures()
m2 = m1.map(lambda (product, feature): feature).collect()
m3 = matrix(m2).transpose()
pf = sc.broadcast(m3)
uf = myModel.userFeatures().coalesce(100)

# get predictions on all user
f1 = uf.map(lambda (userID, features): (userID, squeeze(asarray(matrix(array(features)) * pf.value))))

ref:
https://www.slideshare.net/SparkSummit/26-trillion-app-recomendations-using-100-lines-of-spark-code-ayman-farahat

Evaluate a binary classification

from pyspark.ml.evaluation import BinaryClassificationEvaluator

matrix = [
    (0.5, 1),
    (2.0, 1),
    (0.8, 1),
    (0.2, 0),
    (0.1, 0),
    (0.4, 0),
]
predictions = spark.createDataFrame(matrix, ['prediction', 'label'])
predictions = predictions.withColumn('prediction', predictions['prediction'].cast('double'))

evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
evaluator.evaluate(predictions)

Calculate ranking metrics

from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window
from pyspark.sql.functions import col, expr
import pyspark.sql.functions as F

k = 10

windowSpec = Window.partitionBy('user').orderBy(col('prediction').desc())
perUserPredictedItemsDF = outputDF \
    .select('user', 'item', 'prediction', F.rank().over(windowSpec).alias('rank')) \
    .where('rank <= {0}'.format(k)) \
    .groupBy('user') \
    .agg(expr('collect_list(item) as items'))
perUserPredictedItemsDF.show()
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[36560369, 197450...|
# |   47217|[40693501, 643554...|
# +--------+--------------------+

windowSpec = Window.partitionBy('from_user_id').orderBy(col('starred_at').desc())
perUserActualItemsDF = rawDF \
    .select('from_user_id', 'repo_id', 'starred_at', F.rank().over(windowSpec).alias('rank')) \
    .where('rank <= {0}'.format(k)) \
    .groupBy('from_user_id') \
    .agg(expr('collect_list(repo_id) as items')) \
    .withColumnRenamed('from_user_id', 'user')
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[29122050, 634846...|
# |   59990|[9820191, 8729416...|
# +--------+--------------------+

perUserItemsRDD = perUserPredictedItemsDF.join(perUserActualItemsDF, 'user') \
    .rdd \
    .map(lambda row: (row[1], row[2]))
rankingMetrics = RankingMetrics(perUserItemsRDD)

print(rankingMetrics.meanAveragePrecision)
print(rankingMetrics.precisionAt(k))
print(rankingMetrics.ndcgAt(k))

ref:
https://www.safaribooksonline.com/library/view/spark-the-definitive/9781491912201/ch19.html

Create a custom Transformer

from pyspark.ml import Transformer

class PredictionProcessor(Transformer):

    def _transform(self, predictedDF):
        nonNullDF = predictedDF.dropna(subset=['prediction', ])
        predictionDF = nonNullDF.withColumn('prediction', nonNullDF['prediction'].cast('double'))
        return predictionDF

ref:
http://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml

RankingMetrics

結果介於 0 ~ 1 之間,分數越大越好。

from pyspark.mllib.evaluation import RankingMetrics

predictionAndLabels = sc.parallelize([
    ([1, 2, 3, 4], [1, 2, 3, 4]),
    ([1, ], [1, 10]),
    ([6, 4, 2], [6, 2, 100, 8, 2, 55]),
])
metrics = RankingMetrics(predictionAndLabels)
metrics.ndcgAt(5)

ref:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems

Create a custom Evaluator

from pyspark import keyword_only
from pyspark.ml.evaluation import Evaluator
from pyspark.ml.param.shared import Param
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
import pyspark.sql.functions as F

class RankingEvaluator(Evaluator):

    @keyword_only
    def __init__(self, k=None):
        super(RankingEvaluator, self).__init__()
        self.k = Param(self, 'k', 'Top K')
        self._setDefault(k=30)
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, k=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def isLargerBetter(self):
        return True

    def setK(self, value):
        self._paramMap[self.k] = value
        return self

    def getK(self):
        return self.getOrDefault(self.k)

    def _evaluate(self, outputDF):
        k = self.getK()

        windowSpec = Window.partitionBy('user').orderBy(col('prediction').desc())
        perUserPredictedItemsDF = outputDF \
            .select('user', 'item', 'prediction', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        windowSpec = Window.partitionBy('user').orderBy(col('starred_at').desc())
        perUserActualItemsDF = outputDF \
            .select('user', 'item', 'starred_at', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        perUserItemsRDD = perUserPredictedItemsDF.join(F.broadcast(perUserActualItemsDF), 'user', 'inner') \
            .rdd \
            .map(lambda row: (row[1], row[2]))
        rankingMetrics = RankingMetrics(perUserItemsRDD)
        metric = rankingMetrics.ndcgAt(k)
        return metric

Show best parameters from a CrossValidatorModel

metric_params_pairs = list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))
metric_params_pairs.sort(key=lambda x: x[0], reverse=True)
best_metric_params = metric_params_pairs[0][1]
for pair in metric_params_pairs:
    metric, params = pair
    print('metric', metric)
    for k, v in params.items():
        print(k.name, v)
    print('')
# metric 0.5273636632856705
# rank 50
# regParam 0.01
# maxIter 10
# alpha 1

ref:
https://stackoverflow.com/questions/31749593/how-to-extract-best-parameters-from-a-crossvalidatormodel
https://stackoverflow.com/questions/39529012/pyspark-get-all-parameters-of-models-created-with-paramgridbuilder

Show best parameters from a TrainValidationSplitModel

metric_params_pairs = list(zip(tvModel.validationMetrics, tvModel.getEstimatorParamMaps()))
metric_params_pairs.sort(key=lambda x: x[0], reverse=True)
for pair in metric_params_pairs:
    metric, params = pair
    print('metric', metric)
    for k, v in params.items():
        print(k.name, v)
    print('')
# metric 0.5385481418189484
# rank 50
# regParam 0.1
# maxIter 20
# alpha 1
Spark SQL cookbook (Python)

Spark SQL cookbook (Python)

Access SparkSession

from pyspark.sql import SparkSession

# get the default SparkSession instance
spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext
sc.setLogLevel('INFO')

Access custom configurations

$ spark-submit \
--master spark://localhost:7077 \
--properties-file my-properties.conf \
your_spark_app.py

# in my-properties.conf
# spark.myapp.db_host 192.168.100.10
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
print(sc.getConf().get('spark.myapp.db_host'))

ref:
https://stackoverflow.com/questions/31115881/how-to-load-java-properties-file-and-use-in-spark

Create a RDD

array = [
    1,
    5,
    7,
    2,
    3,
]
rdd = sc.parallelize(array)
df = rdd.toDF('int')
# +-----+
# |value|
# +-----+
# |    1|
# |    5|
# |    7|
# |    2|
# |    3|
# +-----+

Read data from MySQL

$ pyspark --packages "mysql:mysql-connector-java:5.1.41"
url = 'jdbc:mysql://127.0.0.1:3306/albedo'
properties = {'user': 'root', 'password': '123'}
spark.read.jdbc(url, 'app_repostarring', properties=properties)

ref:
https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-configuration-properties.html

Write data to MySQL

CREATE TABLE `recommender_songrecommend` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) NOT NULL,
  `song_id` int(11) NOT NULL,
  `score` double NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1;
matrix = [
    (1, 2, 0.1),
    (3, 4, 0.23456),
    (5, 6, 7.89),
    (7, 8, -10.111213),
]
df = spark.createDataFrame(matrix, ['user_id','song_id', 'score'])

url = 'jdbc:mysql://192.168.11.200:3306/DATABASE?user=USER&password=PASSWORD&verifyServerCertificate=false&useSSL=false&rewriteBatchedStatements=true'
properties = {'driver': 'com.mysql.jdbc.Driver'}
df \
    .selectExpr('user AS user_id', 'item AS song_id', 'score') \
    .write.jdbc(url, table='recommender_songrecommend', mode='append', properties=properties)

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.jdbc
https://stackoverflow.com/questions/2993251/jdbc-batch-insert-performance/10617768#10617768

Read data from SQLite

$ pyspark --packages "org.xerial:sqlite-jdbc:3.16.1"
from pyspark.sql.functions import lit
from pyspark.sql.types import IntegerType, StructField, StructType, TimestampType

props = {'driver': 'org.sqlite.JDBC', 'date_string_format': 'yyyy-MM-dd HH:mm:ss'}
df = spark.read.jdbc("jdbc:sqlite:db.sqlite3", "app_repostarring", properties=props)

df = df.where(df.stargazers_count >= min_stargazers_count)
df = df.select('from_user_id', 'repo_id', 'created_at')
df = df.toDF('user', 'item') 
df = df.withColumn('rating', lit(1))

schema = StructType([
    StructField('user', IntegerType(), nullable=False),
    StructField('item', IntegerType(), nullable=False),
    StructField('rating', IntegerType(), nullable=False),
    StructField('item_created_at', TimestampType(), nullable=False),
])
df = spark.createDataFrame(df.rdd, schema)

ref:
https://github.com/xerial/sqlite-jdbc

java.text.ParseException: Unparseable date: "2016-04-22 17:26:54"
https://github.com/xerial/sqlite-jdbc/issues/88

Read data from parquet

from pyspark.sql.utils import AnalysisException

raw_df_filepath = 'raw_df.parquet'

try:
    raw_df = spark.read.format('parquet').load(raw_df_filepath)
except AnalysisException as exc:
    if 'Path does not exist' in exc.desc:
        raw_df = load_raw_data()
        raw_df.write.format('parquet').save(raw_df_filepath)
    else:
        raise exc

ref:
https://community.hortonworks.com/articles/21303/write-read-parquet-file-in-spark.html

Create a DataFrame

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (2, 1, 0),
    (3, 1, 1),
    (3, 3, 1),
    (3, 4, 1),
    (4, 1, 0),
    (4, 2, 0),
    (5, 9, 1),
    (5, 5, 0),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'rating'])

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession.createDataFrame

Create a DataFrame with explicit schema

from pyspark.sql.types import *

matrix = [
    ('Alice', 0.5, 5.0),
    ('Bob',   0.2, 92.0),
    ('Tom',   0.0, 122.0),
    ('Zack',  0.1, 1.0),
]
schema = StructType([
    StructField('name', StringType(), nullable=False),
    StructField('prediction', DoubleType(), nullable=False),
    StructField('rating', DoubleType(), nullable=False)
])
df = spark.createDataFrame(matrix, schema)
df.printSchema()

new_df = spark.createDataFrame(someRDD, df.schema)

ref:
https://github.com/awesome-spark/spark-gotchas/blob/master/05_spark_sql_and_dataset_api.md

Create a nested schema

from pyspark.sql.types import ArrayType
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType

recommendation_schema = StructType([
    StructField('item', IntegerType(), nullable=False),
    StructField('rating', FloatType(), nullable=False),
])

user_recommendations_schema = StructType([
    StructField('user', IntegerType(), nullable=False),
    StructField('recommendations', ArrayType(recommendation_schema), nullable=False),
])

matrix = [
    (1, [[100, 1.0], [200, 2.0]]),
    (2, [[300, 3.0]]),
]
df = spark.createDataFrame(matrix, user_recommendations_schema)

df.printSchema()
# root
 # |-- user: integer (nullable = false)
 # |-- recommendations: array (nullable = false)
 # |    |-- element: struct (containsNull = true)
 # |    |    |-- item: integer (nullable = false)
 # |    |    |-- rating: float (nullable = false)
 # |-- recommendations_count: integer (nullable = false)

df.show()
# +----+--------------------+---------------------+
# |user|     recommendations|recommendations_count|
# +----+--------------------+---------------------+
# |   1|[[100,1.0], [200,...|                    2|
# |   2|         [[300,3.0]]|                    1|
# +----+--------------------+---------------------+

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types

Change schema of a DataFrame

from pyspark.sql.types import DoubleType, IntegerType, StructField, StructType

schema = StructType([
    StructField('user', IntegerType(), nullable=False),
    StructField('item', IntegerType(), nullable=False),
    StructField('rating', DoubleType(), nullable=False),
])
df = spark.createDataFrame(df.rdd, schema)

Get numbers of partitions

df.rdd.getNumPartitions()

Split a DataFrame into chunks (partitions)

def write_to_db(partial_rows):
    values_tuples = [(row.user, row.item, row.score, year, week) for row in chunk_rows]
    result = your_mysql_insert_func(values_tuples)
    return [result, ]

save_result = df \
    .repartition(400) \
    .rdd \
    .mapPartitions(write_to_db) \
    .collect()

ref:
https://stackoverflow.com/questions/24898700/batching-within-an-apache-spark-rdd-map
https://stackoverflow.com/questions/35370826/using-spark-for-sequential-row-by-row-processing-without-map-and-reduce

Show a DataFrame

# print the schema in a tree format
df.printSchema()

# print top 20 rows
df.show()

# print top 100 rows
df.show(100)

# calculate the descriptive statistics
df.describe().show()
df.describe(['like_count', ]).show()

ref:
http://spark.apache.org/docs/latest/sql-programming-guide.html#untyped-dataset-operations-aka-dataframe-operations

Create a column with a literal value

from pyspark.sql.functions import lit

df = df.withColumn('like', lit(1))

ref:
http://stackoverflow.com/questions/33681487/how-do-i-add-a-new-column-to-a-spark-dataframe-using-pyspark

Return a fraction of a DataFrame

fraction = {
    'u1': 0.5,
    'u2': 0.5,
    'u3': 0.5,
    'u4': 0.5,
    'u5': 0.5,
}
df.sampleBy('user', fraction).show()
# +----+----+----------+
# |user|item|prediction|
# +----+----+----------+
# |  u1|  i1|         1|
# |  u3|  i4|         4|
# |  u4|  i1|         1|
# |  u4|  i2|         3|
# |  u5|  i5|         5|
# +----+----+----------+

Show distinct values of a column

df.select('user').distinct().show()

ref:
https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/

Rename columns

df.printSchema()
# root
# |-- from_user_id: integer (nullable = true)
 # |-- repo_id: integer (nullable = true)

df = df.toDF('user', 'item')
# or
df = df.withColumnRenamed('from_user_id', 'user').withColumnRenamed('repo_id', 'item')

Convert a column to double type

df = df.withColumn('prediction', df['prediction'].cast('double'))

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.cast

Update a colume based on conditions

from pyspark.sql.functions import when

df = df.withColumn('label', when(df['prediction'] > 0.5, 1).otherwise(0))

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.when
http://stackoverflow.com/questions/34908448/spark-add-column-to-dataframe-conditionally

Drop columns from a DataFrame

predictions = predictions.dropna(subset=['prediction', S])

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropna

DataFrame subtract another DataFrame

matrix1 = [
    (1, 2, 123),
    (3, 4, 1),
    (5, 6, 45),
    (7, 8, 3424),
]
df1 = spark.createDataFrame(matrix1, ['user','item', 'play_count'])

matrix2 = [
    (3, 4),
    (5, 6),
    (9, 1),
    (7, 8),
    (1, 6),
    (1, 1),
]
df2 = spark.createDataFrame(matrix2, ['user','item'])

df2.subtract(df1.select('user', 'item')).show()
# +----+----+
# |user|item|
# +----+----+
# |   1|   1|
# |   1|   6|
# |   9|   1|
# +----+----+

# or

testing_rdd = df.rdd.subtract(training.rdd)
testing = spark.createDataFrame(testing_rdd, df.schema)

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.subtract

Convert a DataFrame column into a Python list

# list
popluar_items = [row['item'] for row in popular_df.select('item').collect()]

# set
all_items = {row.id for row in als_model.itemFactors.select('id').collect()}

Concatenate (merge) two DataFrames

full_df = df1.union(df2)

Convert a DataFrame to a Python dict

d = df.rdd.collectAsMap()
d['some_key']

Compute (approximate or exact) median of a numerical column

approximate_median = df.approxQuantile('count', [0.5, ], 0.25)
exact_median = df.approxQuantile('count', [0.5, ], 0.0)

maximum = df.approxQuantile('count', [1.0, ], 0.1)
minimum = df.approxQuantile('count', [0.0, ], 0.1)

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.approxQuantile

Find frequent items for columns

df = rating_df.freqItems(['item', ], support=0.01)
# +--------------------+
# |      item_freqItems|
# +--------------------+
# |[194512, 371798, ...|
# +--------------------+

# get the value of a DataFrame column
popular_items = df.collect()[0].item_freqItems

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.freqItems

Broadcast a value

bc_candidates = sc.broadcast(set([1, 2, 4, 5, 8]))
print(bc_candidates.value)
# {8, 1, 2, 4, 5}

bc_df = sc.broadcast(df.collect())
df = spark.createDataFrame(bc_df.value)

ref:
http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

Broadcast a DataFrame in join

import pyspark.sql.functions as F

large_df.join(F.broadcast(small_df), 'some_key')

ref:
http://stackoverflow.com/questions/34053302/pyspark-and-broadcast-join-example
https://chapeau.freevariable.com/2014/09/improving-spark-application-performance.html

Cache a DataFrame

df.cache()

ref:
http://stackoverflow.com/questions/38056774/spark-cache-vs-broadcast

Show query execution plan

df.explain(extended=True)

Use SQL to query a DataFrame

props = {'driver': 'org.sqlite.JDBC', 'date_string_format': 'yyyy-MM-dd HH:mm:ss'}
df = spark.read.jdbc("jdbc:sqlite:db.sqlite3", "app_repostarring", properties=props)
df.createOrReplaceTempView('repo_stars')

query = 'SELECT DISTINCT repo_id AS item FROM repo_stars WHERE stargazers_count > 1000'
df2 = spark.sql(query)
df2.show()

query = """
SELECT
    from_user_id AS user,
    count(repo_id) AS count
FROM repo_stars
GROUP BY from_user_id
ORDER BY count DESC
"""
df = spark.sql(query)

params = {'top_n': top_n}
query = """
SELECT 
    repo_id AS item,
    MAX(stargazers_count) AS stars
FROM repo_stars
GROUP BY repo_id
ORDER BY stars DESC
LIMIT {top_n}
""".format(**params)
popular_df = spark.sql(query)

ref:
https://sparkour.urizone.net/recipes/using-sql-udf/

WHERE ... IN ...

from pyspark.sql.functions import col

item_ids = [1, 2, 5, 8]
raw_df \
    .where(col('repo_id').isin(item_ids)) \
    .select('repo_url') \
    .collectAsMap()

ORDER BY multiple columns

import pyspark.sql.functions as F

rating_df = raw_df \
    .selectExpr('from_user_id AS user', 'repo_id AS item', '1 AS rating', 'starred_at') \
    .orderBy('user', F.col('starred_at').desc())

Aggregate

df.agg(min('user'), max('user'), min('item'), max('item')).show()

max_value = user_star_count_df.agg(F.max('stars')).collect()[0]["max('stars')"]

SELECT COUNT(DISTINCT xxx) ...

import pyspark.sql.functions as F

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (2, 1, 0),
    (3, 1, 1),
    (3, 3, 1),
    (3, 4, 1),
    (4, 1, 0),
    (4, 2, 0),
    (5, 9, 1),
    (5, 5, 0),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'rating'])
df.agg(F.countDistinct('user')).show()
# +--------------------+
# |count(DISTINCT user)|
# +--------------------+
# |                   5|
# +--------------------+

ref:
http://stackoverflow.com/questions/40888946/spark-dataframe-count-distinct-values-of-every-column

SELECT MAX(xxx) ... GROUP BY

df.groupBy('user').count().filter('count >= 4').show()

popular_df = raw_df \
    .groupBy('repo_id') \
    .agg(F.max('stargazers_count').alias('stars')) \
    .orderBy('stars', ascending=False) \
    .limit(top_n)

ref:
http://stackoverflow.com/questions/30616380/spark-how-to-count-number-of-records-by-key

SELECT COUNT() ... GROUP BY

prediction_df.groupBy('user').count().show()
# +------+-----+
# |  user|count|
# +------+-----+
# |649661|   15|
# |464340|   15|
# |468780|   15|
# |489233|   11|
# |455143|   14|
# +------+-----+

stargazers_count_df = rating_df \
    .groupBy('item') \
    .agg(F.count('user').alias('stargazers_count')) \
    .orderBy('stargazers_count', ascending=False)
# +--------+----------------+
# |    item|stargazers_count|
# +--------+----------------+
# | 3544424|             137|
# | 2126244|             120|
# | 7691631|             115|
# | 1362490|             112|
# |  943149|             112|
# +--------+----------------+

starred_count_df = rating_df \
    .groupBy('user') \
    .agg(F.count('item').alias('starred_count')) \
    .orderBy('starred_count', ascending=False)
# +-------+-------------+
# |   user|starred_count|
# +-------+-------------+
# |  48936|         7201|
# |4560482|         5898|
# |3382565|         3978|
# |  10652|         3586|
# |  31996|         3459|
# +-------+-------------+

You may want to use approx_count_distinct.

GROUP_CONCAT a column

from pyspark.sql.functions import expr

per_user_predictions_df = output_df \
  .orderBy(['user', 'prediction'], ascending=False) \
  .groupBy('user') \
  .agg(expr('collect_list(item) as items'))
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[36560369, 197450...|
# |   47217|[40693501, 643554...|
# +--------+--------------------+

ref:
http://stackoverflow.com/questions/31640729/spark-sql-replacement-for-mysql-group-concat-aggregate-function

GROUP_CONCAT multiple columns

from pyspark.sql.functions import col, collect_list, struct

matrix = [
    (1, 1, 0.1),
    (1, 2, 5.1),
    (1, 6, 0.0),
    (2, 6, 9.3),
    (3, 1, 0.54),
    (3, 5, 0.83),
    (4, 1, 0.65),
    (4, 4, 1.023),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'prediction'])

df \
    .groupBy("user") \
    .agg(collect_list(struct(col('item'), col('prediction'))).alias("recommendations")) \
    .show(truncate=False)
# +----+---------------------------+
# |user|recommendations            |
# +----+---------------------------+
# |1   |[[1,0.1], [2,5.1], [6,0.0]]|
# |3   |[[1,0.54], [5,0.83]]       |
# |2   |[[6,9.3]]                  |
# |4   |[[1,0.65], [4,1.023]]      |
# +----+---------------------------+

ref:
https://stackoverflow.com/questions/37737843/aggregating-multiple-columns-with-custom-function-in-spark

SELECT ... RANK() OVER (PARTITION BY ... ORDER BY)

from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
import pyspark.sql.functions as F

window_spec = Window.partitionBy('from_user_id').orderBy(col('starred_at').desc())
per_user_actual_items_df = raw_df \
    .select('from_user_id', 'repo_id', 'starred_at', F.rank().over(window_spec).alias('rank')) \
    .where('rank <= 10') \
    .groupBy('from_user_id') \
    .agg(expr('collect_list(repo_id) as items')) \
    .withColumnRenamed('from_user_id', 'user')
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[29122050, 634846...|
# |   59990|[9820191, 8729416...|
# +--------+--------------------+

window_spec = Window.partitionBy('user').orderBy(col('prediction').desc())
per_user_predicted_items_df = output_df \
    .select('user', 'item', 'prediction', F.rank().over(window_spec).alias('rank')) \
    .where('rank <= 10') \
    .groupBy('user') \
    .agg(expr('collect_list(item) as items'))
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[36560369, 197450...|
# |   47217|[40693501, 643554...|
# +--------+--------------------+

ref:
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
https://github.com/awesome-spark/spark-gotchas/blob/master/05_spark_sql_and_dataset_api.md#window-functions

Left anti join / Left excluding join

clean_df = rating_df.join(to_remove_items, 'item', 'left_anti')

ref:
https://www.codeproject.com/Articles/33052/Visual-Representation-of-SQL-Joins

Outer join

m1 = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 4, 1),
    (2, 2, 1),
    (2, 3, 1),
    (3, 5, 1),
]
m1df = spark.createDataFrame(m1, ['user', 'item', 'rating'])
m1df = m1df.where('user = 1').alias('m1df')

m2 = [
    (1, 100),
    (2, 200),
    (3, 300),
    (4, 400),
    (5, 500),
    (6, 600),
]
m2df = spark.createDataFrame(m2, ['item', 'count'])
m2df = m2df.alias('m2df')

m1df.join(m2df, m1df.item == m2df.item, 'rightouter') \
.where('m1df.user IS NULL') \
.orderBy('m2df.count', ascending=False) \
.selectExpr('1 AS user', 'm2df.item', '0 AS rating') \
.show()
# +----+----+------+
# |user|item|rating|
# +----+----+------+
# |   1|   6|     0|
# |   1|   5|     0|
# |   1|   3|     0|
# +----+----+------+

ref:
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-joins.html

Cross join

df = m1df.select('user').distinct().crossJoin(m2df.select('item'))
df.show()

query = """
SELECT f1.user, f2.item, 0 AS rating
FROM f1
CROSS JOIN f2
"""
df = spark.sql(query)
df.show()

all_user_item_pair_df = als_model.userFactors.selectExpr('id AS user') \
    .crossJoin(alsModel.itemFactors.selectExpr('id AS item'))
# +----+----+
# |user|item|
# +----+----+
# |xxxx|oooo|
# +----+----+

ref:
http://stackoverflow.com/questions/5464131/finding-pairs-that-do-not-exist-in-a-different-table

Spark RDD methods (Python / Scala)

Spark RDD methods (Python / Scala)

以下的 methods 有些是所有 RDD 都能用,有些則是 PairRDD 才有的功能。然後因為在不同的 projects 我先後用了 Spark 的 Python API 和 Scala API,所以以下的內容可能會混雜著兩者的範例。Scala API 要特別注意每個 method 接受和回傳的 type 的差異;Python API 就沒有這種限制了,畢竟是動態語言。

ref:
http://spark.apache.org/docs/latest/programming-guide.html#transformations
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions

RDD Methods

map()

函數 signature 為 def map[U](f: (T) ⇒ U): RDD[U]

map() 接受的 function 的輸入參數就是 RDD 的每個元素(從 DataFrame 的角度看,每個 row):func(row),return 一個任意物件(例如一個 int、一個 string 或一個 tuple)。所以 map() 之後會得到一個 row 數相同的 RDD,但是 type 可能會不一樣。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd.map(lambda row: (row[0], row[1])).collect()
# [(1, 1), (1, 2), (1, 3), (1, 6), (2, 6), (3, 1), (3, 5), (4, 1), (4, 4)]
import org.apache.spark.rdd.RDD

val rdd: RDD[(String, Int)] = sc.parallelize(Seq(
  ("Python", 1),
  ("Scala", 3),
  ("Java", 2)
))
val test: RDD[String] = rdd.map({
  case (lang, count) => {
    s"${lang}_${count}"
  }
})
test.collect().foreach(println)
// Python_1
// Scala_3
// Java_2

flatMap()

函數 signature 為 def flatMap[U](f: (T) ⇒ TraversableOnce[U]): RDD[U]

flatMap()map() 很像,接受的 function 的輸入參數也是 RDD 的每個 row:func(row),差別在於 flatMap() 只能回傳一個 Iterable 物件(例如一個 tuple 或 list,但是可以是空的),而且 flatMap() 會把 return 的結果攤平。所以 flatMap() 之後的 count() 可能會比原本的 RDD 大或小。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd.flatMap(lambda row: (row[0], row[1])).collect()
# [1, 1, 1, 2, 1, 3, 1, 6, 2, 6, 3, 1, 3, 5, 4, 1, 4, 4]

df.rdd.flatMap(lambda row: (row[2], )).collect()
# [1, 1, 1, 0, 1, 1, 1, 0, 1]

ref:
http://backtobazics.com/big-data/spark/apache-spark-flatmap-example/
http://apachesparkbook.blogspot.tw/2015/05/difference-between-map-flatmap-in-rdd.html
http://stackoverflow.com/questions/42867457/map-each-element-of-a-list-in-spark
http://stackoverflow.com/questions/21096432/list-or-iterator-of-tuples-returned-by-map-pyspark

reduce()

函數 signature 為 def reduce(f: (T, T) ⇒ T): T

reduce() 接受的 function 的輸入參數是 RDD 的兩兩元素:func(element1, element2),return 單一物件,而且是跟輸入參數同樣 type 的,最後整個 reduce() 會得到一個單一的值。

array = [
    1,
    5,
    4,
    2,
    3,
]
rdd = sc.parallelize(array)

rdd.reduce(lambda element1, element2: element1 + element2)
# 15

def max(element1, element2):
    return element1 if element1 > element2 else element2

rdd.reduce(max)
# 5

treeReduce()

函數 signature 為 def treeReduce(f: (T, T) ⇒ T, depth: Int = 2): T

普通的 reduce() 會直接把每一個 partition 的 reduce 結果送回 driver machine 做最後運算,當 partition 很多和每個 partition 的資料量很大的時候可能會是一個瓶頸。這時候你可以改用 treeReduce(),不過使用不當的話反而會有反效果。類似的關係還有 aggregate()treeAggregate()

ref:
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/treereduce_and_treeaggregate_demystified.html

aggregate()

函數 signature 為 def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U

import org.apache.spark.rdd.RDD

case class WikipediaArticle(title: String, text: String) {
  def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}

val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
  WikipediaArticle("a1", "abc Scala xyz"),
  WikipediaArticle("a2", "abc Python xyz"),
  WikipediaArticle("a3", "abc Scala xyz"),
  WikipediaArticle("a4", "abc Scala xyz")
))

def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int = {
  val accumulateOp = (total: Int, a: WikipediaArticle) => total + 1
  val combineOp = (count1: Int, count2: Int) => count1 + count2
  rdd.filter(_.mentionsLanguage(lang)).aggregate(0)(accumulateOp, combineOp)
}

occurrencesOfLang("Scala", wikiRdd)
// 3

ref:
https://stackoverflow.com/questions/28240706/explain-the-aggregate-functionality-in-spark

PairRDD Methods

PairRDD 就是包含 key/value pair 的 RDD,長這樣:RDD[(K, V)]KV 除了可能是基本的 type 之外,也可能是其他 object 或 collection。PairRDDFunctions 回傳的也都是 RDD[(K, V)]

ref:
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

mapValues()

函數 signature 為 def mapValues[U](f: (V) ⇒ U): RDD[(K, U)]

mapValues() 的 function 接受的唯一一個參數就是 PairRDDV,它只處理 values。

val rdd = sc.parallelize(Seq(
  ("Python", 1),
  ("Scala", 3),
  ("Java", 2)
))

rdd
  .mapValues((count: Int) => {
    count * 10
  })
  .collect()
// Array((Python,10), (Scala,30), (Java,20))

ref:
http://apachesparkbook.blogspot.tw/2015/12/mapvalues-example.html

groupByKey()

函數 signature 為 def groupByKey(): RDD[(K, Iterable[V])]

以下這些情況應該避免使用 groupByKey()

  • If operataion is expressed using groupByKey followed by associative and commutative reducing operation on values (sum, count, max / min) it should be replaced by reduceByKey.
  • If operation can be expressed using a comination of local sequence operation and merge operation (online variance / mean, top-n observations) it should be expressed with combineByKey or aggregateByKey.
  • If final goal is to traverse values in a specific order (groupByKey followed by sorting values followed by iteration) it can be typically rewritten as repartitionAndSortWithinPartitions with custom partitioner and ordering followed by mapPartitions.
case class WikipediaArticle(title: String, text: String) {
  def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}

val langs = List(
  "Java", "PHP", "Python", "Scala"
)

val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
  WikipediaArticle("a1", "a1 Scala"),
  WikipediaArticle("a2", "a2 Python"),
  WikipediaArticle("a3", "a3 Scala"),
  WikipediaArticle("a4", "a4 Java"),
  WikipediaArticle("a5", "a5 Java"),
  WikipediaArticle("a6", "a6 Scala")
))

def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = {
  rdd
    .flatMap((article: WikipediaArticle) => {
      langs
        .filter((lang: String) => {
          article.mentionsLanguage(lang)
        })
        .map((lang: String) => {
          (lang, article)
        })
    })
    .groupByKey()
}

def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd)
// Array(
//   (Java,CompactBuffer(WikipediaArticle(a4,a4 Java), WikipediaArticle(a5,a5 Java))),
//   (Python,CompactBuffer(WikipediaArticle(a2,a2 Python))),
//   (Scala,CompactBuffer(WikipediaArticle(a1,a1 Scala), WikipediaArticle(a3,a3 Scala), WikipediaArticle(a6,a6 Scala)))
// )

def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] = {
  index
    .map({
      case (lang, articles) => {
        (lang, articles.size)
      }
    })
    .sortBy(-_._2)
    .collect()
    .toList
}

rankLangsUsingIndex(index)
// List((Scala,3),(Java,2),(Python,1),(PHP,0))

Spark Best Practices
https://github.com/beeva/beeva-best-practices/blob/master/big_data/spark/README.md

RDD actions and Transformations by Example
https://github.com/awesome-spark/spark-gotchas/blob/master/04_rdd_actions_and_transformations_by_example.md

Avoid groupByKey when performing an associative reductive operation
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/avoid_groupbykey_when_performing_an_associative_re.html

reduceByKey()

函數 signature 為 def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

reduceByKey() 接受的兩個參數的類型是由 PairRDDV 決定的。做到同樣的功能,reduceByKey() 的執行效率比 groupByKey() + reduce() 好很多。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd \
    .map(lambda row: (row[0], row[1])) \
    .reduceByKey(lambda x, y: x + y) \
    .collect()
# map() => [(1, 1), (1, 2), (1, 3), (1, 6), (2, 6), (3, 1), (3, 5), (4, 1), (4, 4)]
# reduceByKey() => [(1, 12), (2, 6), (3, 6), (4, 5)]
matrix = [
    (1, [[100, 1.0], [200, 2.0]]),
    (2, [[300, 3.0]]),
    (2, [[400, 4.0], [600, 6.0]]),
    (2, [[500, 5.0]]),
]
df = spark.createDataFrame(matrix, ['user', 'recommendations'])

def merge_recommendations(recommendations1, recommendations2):
    return recommendations1 + recommendations2

def slice_recommendations(row, candidate_k):
    user, recommendations = row
    sliced_recommendations = sorted(recommendations, key=lambda recommendation: recommendation.rating, reverse=True)[:candidate_k + 5]
    return (user, sliced_recommendations)

full_rdd = df \
    .rdd \
    .reduceByKey(lambda x, y: merge_recommendations(x, y)) \
    .map(lambda row: slice_recommendations(row, candidate_k))
final_df = spark.createDataFrame(full_rdd, ['user', 'recommendations'])
val langs = List(
  "Java", "PHP", "Python", "Scala"
)

val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
  WikipediaArticle("a1", "a1 Scala"),
  WikipediaArticle("a2", "a2 Python"),
  WikipediaArticle("a3", "a3 Scala"),
  WikipediaArticle("a4", "a4 Java"),
  WikipediaArticle("a5", "a5 Java"),
  WikipediaArticle("a6", "a6 Scala")
))

def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = {
  rdd
    .flatMap((article: WikipediaArticle) => {
      langs
        .filter((lang: String) => {
          article.mentionsLanguage(lang)
        })
        .map((lang: String) => {
          (lang, 1)
        })
    })
    .reduceByKey((count1: Int, count2: Int) => {
      count1 + count2
    })
    .collect()
    .toList
    .sortWith(_._2 > _._2)
}

rankLangsReduceByKey(langs, wikiRdd)
// List((Scala,3),(Java,2),(Python,1),(PHP,0))

Avoid reduceByKey when the input and output value types are different
http://backtobazics.com/big-data/spark/apache-spark-reducebykey-example/

Reduce a key-value pair into a key-list pair
http://stackoverflow.com/questions/27002161/reduce-a-key-value-pair-into-a-key-list-pair-with-apache-spark

foldByKey()

函數 signature 為 def foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]

foldByKey() 基本上就是可以手動指定 zero value 的 reduceByKey()

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd \
    .map(lambda row: (row[0], [row[1], ])) \
    .foldByKey(list(), add) \
    .collect()
# [(4, [1, 4]), (1, [1, 2, 3, 6]), (2, [6]), (3, [1, 5])]

aggregateByKey()

函數 signature 為 def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U): RDD[(K, U)]

適合用來取代 .map(lambda row: (row['user'], [row['item'], ])).reduceByKey(lambda v1, v2: v1 + v2) 這樣的套路。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 4, 1),
    (1, 5, 1),
    (2, 2, 1),
    (2, 2, 1),
    (2, 3, 1),
    (3, 5, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

def seqFunc(item_set, item):
    item_set.add(item)
    return item_set

def combFunc(item_set1, item_set2):
    return item_set1.union(item_set2)

df.select('user', 'item').rdd \
    .aggregateByKey(set(), seqFunc, combFunc) \
    .collect()
# [(1, {1, 2, 4, 5}), (2, {2, 3}), (3, {5})]

ref:
http://codingjunkie.net/spark-agr-by-key/
https://stackoverflow.com/questions/31081563/apache-spark-what-is-the-equivalent-implementation-of-rdd-groupbykey-using-rd
https://stackoverflow.com/questions/24804619/how-does-spark-aggregate-function-aggregatebykey-work

combineByKey()

函數 signature 為 def combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

用來把 RDD[(K, V)] 轉變成 RDD[(K, C)]C 可以是任意的 type。

combineByKey() 接受三個 functions:

  • createCombiner, which turns a V into a C (e.g., creates a one-element list)
  • mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
  • mergeCombiners, to combine two C’s into a single one.
import pyspark.sql.functions as F

matrix = [
    ('chinese', 80),
    ('math', 60),
    ('english', 100),
    ('chinese', 90),
    ('math', 100),
    ('math', 10),
    ('english', 70),
    ('english', 20),
    ('history', 30),
]
df = spark.createDataFrame(matrix, ['subject', 'score'])

def createCombiner(score):
    return (score, 1)

def mergeValue(accumulate, score):
    total_score = accumulate[0] + score
    total_count = accumulate[1] + 1
    return (total_score, total_count)

def mergeCombiners(accumulate1, accumulate2):
    total_score = accumulate1[0] + accumulate2[0]
    total_count = accumulate1[1] + accumulate2[1]
    return (total_score, total_count)

df.rdd.combineByKey(createCombiner, mergeValue, mergeCombiners).collect()
# you could calculate the average score of every subject
# [('chinese', (170, 2)),
 # ('history', (30, 1)),
 # ('math', (170, 3)),
 # ('english', (190, 3))]

ref:
https://zhangyi.gitbooks.io/spark-in-action/content/chapter2/rdd.html

cogroup()

如果你要 join 兩個已經被 groupBy 的 RDD,可以使用 cogroup()。避免使用 flatMap + join + groupBy 這樣的套路。