Spark troubleshooting

Spark troubleshooting

Apache Spark 2.x Troubleshooting Guide
https://www.slideshare.net/jcmia1/a-beginners-guide-on-troubleshooting-spark-applications
https://www.slideshare.net/jcmia1/apache-spark-20-tuning-guide

Check your cluster UI to ensure that workers are registered and have sufficient resources

PYSPARK_DRIVER_PYTHON="jupyter" \
PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip 0.0.0.0" \
pyspark \
--packages "org.xerial:sqlite-jdbc:3.16.1,com.github.fommil.netlib:all:1.1.2" \
--driver-memory 4g \
--executor-memory 20g \
--master spark://TechnoCore.local:7077
TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

可能是你指定的 --executor-memory 超過了 worker 的 memory。

你可以在 Spark Master UI http://localhost:8080/ 看到各個 worker 總共有多少 memory 可以用。如果每台 worker 可以用的 memory 容量不同,Spark 就只會選擇那些 memory 大於 --executor-memory 的 workers。

ref:
https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application

SparkContext was shut down

ERROR Executor: Exception in task 1.0 in stage 6034.0 (TID 21592)
java.lang.StackOverflowError
...
ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(55,1494185401195,JobFailed(org.apache.spark.SparkException: Job 55 cancelled because SparkContext was shut down))

可能是 executor 的記憶體不夠,導致 Out Of Memory (OOM) 了。

ref:
http://stackoverflow.com/questions/32822948/sparkcontext-was-shut-down-while-running-spark-on-a-large-dataset

Container exited with a non-zero exit code 56 (or some other numbers)

WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Container marked as failed: container_1504241464590_0001_01_000002 on host: albedo-w-1.c.albedo-157516.internal. Exit status: 56. Diagnostics: Exception from container-launch.
Container id: container_1504241464590_0001_01_000002
Exit code: 56
Stack trace: ExitCodeException exitCode=56:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:972)
    at org.apache.hadoop.util.Shell.run(Shell.java:869)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1170)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:236)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:305)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:84)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

Container exited with a non-zero exit code 56

可能是 executor 的記憶體不夠,導致 Out Of Memory (OOM) 了。

ref:
http://stackoverflow.com/questions/39038460/understanding-spark-container-failure

Exception in thread "main" java.lang.StackOverflowError

Exception in thread "main" java.lang.StackOverflowError
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    ...

解決辦法:

import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.SparkSession

val spark: SparkSession = SparkSession.builder().getOrCreate()
val sc = spark.sparkContext
sc.setCheckpointDir("./spark-data/checkpoint")

// 因為 sc.setCheckpointDir() 就會啟用 checkpoint 了
// 所以可以不用特別指定 checkpointInterval
val als = new ALS()
  .setCheckpointInterval(2)

ref:
https://stackoverflow.com/questions/31484460/spark-gives-a-stackoverflowerror-when-training-using-als
https://stackoverflow.com/questions/35127720/what-is-the-difference-between-spark-checkpoint-and-persist-to-a-disk

Randomness of hash of string should be disabled via PYTHONHASHSEED

解決辦法:

$ cd $SPARK_HOME
$ cp conf/spark-env.sh.template conf/spark-env.sh
$ echo "export PYTHONHASHSEED=42" >> conf/spark-env.sh

ref:
https://issues.apache.org/jira/browse/SPARK-13330

It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

因為 spark.sparkContext 只能在 driver program 裡存取,不能被 worker 存取(例如那些丟給 RDD 執行的 lambda function 或是 UDF 就是在 worker 上執行的)。

ref:
https://spark.apache.org/docs/latest/rdd-programming-guide.html#passing-functions-to-spark
https://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/

Spark automatically creates closures:

  • for functions that run on RDDs at workers,
  • and for any global variables that are used by those workers.

One closure is send per worker for every task. Closures are one way from the driver to the worker.

ref:
https://gerardnico.com/wiki/spark/closure

Unable to find encoder for type stored in a Dataset

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases. someDF.as[SomeCaseClass]

解決辦法:

import spark.implicits._

yourDF.as[YourCaseClass]

ref:
https://stackoverflow.com/questions/38664972/why-is-unable-to-find-encoder-for-type-stored-in-a-dataset-when-creating-a-dat

Task not serializable

Caused by: java.io.NotSerializableException: Settings
Serialization stack:
    - object not serializable (class: Settings, value: Settings@2dfe2f00)
    - field (class: Settings$$anonfun$1, name: $outer, type: class Settings)
    - object (class Settings$$anonfun$1, <function1>)
Caused by: org.apache.spark.SparkException:
    Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)

通常是你在 closure functions 裡使用了 driver program 裡的某個 object,因為 Spark 會自動 serialize 那個被引用的 object 一起丟給 worker node 執行,所以如果那個 object 或是 class 沒辦法被 serialize,就會出現這個錯誤。

ref:
https://www.safaribooksonline.com/library/view/spark-the-definitive/9781491912201/ch04.html#user-defined-functions
http://www.puroguramingu.com/2016/02/26/spark-dos-donts.html
https://stackoverflow.com/questions/36176011/spark-sql-udf-task-not-serialisable
https://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html
https://mp.weixin.qq.com/s/BT6sXZlHcufAFLgTONCHsg

如果你只有在 Databricks Notebook 裡遇到這個錯誤,因為 Notebook 的運作機制跟一般的 Spark application 稍微有點不同,你可以試試 package cell。

ref:
https://docs.databricks.com/user-guide/notebooks/package-cells.html

java.lang.IllegalStateException: Cannot find any build directories.

java.lang.IllegalStateException: Cannot find any build directories.
    at org.apache.spark.launcher.CommandBuilderUtils.checkState(CommandBuilderUtils.java:248)
    at org.apache.spark.launcher.AbstractCommandBuilder.getScalaVersion(AbstractCommandBuilder.java:240)
    at org.apache.spark.launcher.AbstractCommandBuilder.buildClassPath(AbstractCommandBuilder.java:194)
    at org.apache.spark.launcher.AbstractCommandBuilder.buildJavaCommand(AbstractCommandBuilder.java:117)
    at org.apache.spark.launcher.WorkerCommandBuilder.buildCommand(WorkerCommandBuilder.scala:39)
    at org.apache.spark.launcher.WorkerCommandBuilder.buildCommand(WorkerCommandBuilder.scala:45)
    at org.apache.spark.deploy.worker.CommandUtils$.buildCommandSeq(CommandUtils.scala:63)
    at org.apache.spark.deploy.worker.CommandUtils$.buildProcessBuilder(CommandUtils.scala:51)
    at org.apache.spark.deploy.worker.ExecutorRunner.org$apache$spark$deploy$worker$ExecutorRunner$$fetchAndRunExecutor(ExecutorRunner.scala:145)
    at org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:73)

可能的原因是沒有設置 SPARK_HOME 或是你的 launch script 沒有讀到該環境變數。

Build a recommender system with Spark: Implicit ALS

Build a recommender system with Spark: Implicit ALS

在這個系列的文章裡,我們將使用 Apache Spark、XGBoost、Elasticsearch 和 MySQL 等工具來搭建一個推薦系統的 Machine Learning Pipeline。推薦系統的組成可以粗略地分成 Candidate Generation 和 Ranking 兩個部分,前者是針對用戶產生候選物品集,常用的方法有 Collaborative Filtering、Content-based、標籤配對、熱門排行或人工精選等;後者則是對這些候選物品排序,以 Top N 的方式呈現最終的推薦結果,常用的方法有 Logistic Regression。

在本篇文章中,我們將以 Candidate Generation 階段常用的方法之一:Collaborative Filtering 協同過濾演算法為例,利用 Apache Spark 的 ALS (Alternating Least Squares) 模型建立一個 GitHub repositories 的推薦系統,以用戶對 repo 的打星紀錄作為訓練數據,推薦出用戶可能會感興趣的其他 repo 作為候選物品集。

完整的程式碼可以在 https://github.com/vinta/albedo 找到。

系列文章:

Submit the Application

因為需要使用 JDBC 讀取 MySQL 資料庫,必須安裝 MySQL driver,可以透過 --packages "mysql:mysql-connector-java:5.1.41" 參數在 cluster 的每一台機器上安裝需要的 Java packages。

$ spark-submit \
--packages "com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41" \
--master spark://YOUR_SPARK_MASTER:7077 \
--py-files deps.zip \
train_als.py -u vinta

ref:
https://spark.apache.org/docs/latest/submitting-applications.html

Load Data

讀取來自 MySQL 資料庫的數據。你可以使用 predicates 參數來指定 WHERE 條件,雖然嚴格來說這個參數是用來控制 partition 數量的,一個條件就是一個 partition。

假設 app_repostarring 的欄位如下:

CREATE TABLE `app_repostarring` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `from_user_id` int(11) NOT NULL,
  `from_username` varchar(39) NOT NULL,
  `repo_owner_id` int(11) NOT NULL,
  `repo_owner_username` varchar(39) NOT NULL,
  `repo_owner_type` varchar(16) NOT NULL,
  `repo_id` int(11) NOT NULL,
  `repo_name` varchar(100) NOT NULL,
  `repo_full_name` varchar(140) NOT NULL,
  `repo_description` varchar(191) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `repo_language` varchar(32) NOT NULL,
  `repo_created_at` datetime(6) NOT NULL,
  `repo_updated_at` datetime(6) NOT NULL,
  `starred_at` datetime(6) NOT NULL,
  `stargazers_count` int(11) NOT NULL,
  `forks_count` int(11) NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `from_user_id_repo_id` (`full_name`, `repo_id`)
);
def loadRawData():
    url = 'jdbc:mysql://127.0.0.1:3306/albedo?user=root&password=123&verifyServerCertificate=false&useSSL=false'
    properties = {'driver': 'com.mysql.jdbc.Driver'}
    rawDF = spark.read.jdbc(url, table='app_repostarring', properties=properties)
    return rawDF

rawDF = loadRawData()

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=jdbc#pyspark.sql.DataFrameReader.jdbc
http://www.gatorsmile.io/numpartitionsinjdbc/

Preprocess Data

Format Data

把 raw data 整理成 user,item,rating,starred_at 這樣的格式。starred_at 只有評價 model 時有用來排序,訓練 model 時並沒有用到,因為 Spark 的 ALS 沒辦法輕易地整合 side information。

from pyspark.ml import Transformer

class RatingBuilder(Transformer):

    def _transform(self, rawDF):
        ratingDF = rawDF \
            .selectExpr('from_user_id AS user', 'repo_id AS item', '1 AS rating', 'starred_at') \
            .orderBy('user', F.col('starred_at').desc())
        return ratingDF

ratingBuilder = RatingBuilder()
ratingDF = ratingBuilder.transform(rawDF)
ratingDF.cache()

ref:
http://blog.ethanrosenthal.com/2016/11/07/implicit-mf-part-2/

Inspect Data

import pyspark.sql.functions as F

ratingDF.rdd.getNumPartitions()
# 200

ratingDF.agg(F.count('rating'), F.countDistinct('user'), F.countDistinct('item')).show()
# +-------------+--------------------+--------------------+
# |count(rating)|count(DISTINCT user)|count(DISTINCT item)|
# +-------------+--------------------+--------------------+
# |      3121629|               10483|              551216|
# +-------------+--------------------+--------------------+

stargazersCountDF = ratingDF \
    .groupBy('item') \
    .agg(F.count('user').alias('stargazers_count')) \
    .orderBy('stargazers_count', ascending=False)
stargazersCountDF.show(10)
# +--------+----------------+
# |    item|stargazers_count|
# +--------+----------------+
# | 2126244|            2211|
# |10270250|            1683|
# |  943149|            1605|
# |  291137|            1567|
# |13491895|            1526|
# | 9384267|            1480|
# | 3544424|            1468|
# | 7691631|            1441|
# |29028775|            1427|
# | 1334369|            1399|
# +--------+----------------+

starredCountDF = ratingDF \
    .groupBy('user') \
    .agg(F.count('item').alias('starred_count')) \
    .orderBy('starred_count', ascending=False)
starredCountDF.show(10)
# +-------+-------------+
# |   user|starred_count|
# +-------+-------------+
# |3947125|         8947|
# |5527642|         7978|
# | 446613|         7860|
# | 627410|         7800|
# |  13998|         6334|
# |2467194|         6327|
# |  63402|         6034|
# |2005841|         6024|
# |5073946|         5980|
# |   2296|         5862|
# +-------+-------------+

Clean Data

你可以過濾掉那些太少 user 打星的 item 和打星了太少 item 的 user,提昇矩陣的稠密度。這個現象也正好是 Cold Start 的問題,你就是沒有足夠多的關於這些 item 和 user 的數據(可以考慮使用 content-based 的推薦方式)。除此之外,如果你的推薦系統所推薦的 item 只有非常少人打星,即便你完美地挖掘了長尾效應,這樣的推薦結果給用戶的「第一印象」可能也不會太好(這可能決定了他要不要繼續使用這個系統或是他要不要真的去嘗試那個你推薦給他的東西)。

你也可以選擇要不要過濾掉那些超多人打星的 item 和打星了超多 item 的 user。如果某些 item 有超過八、九成的 user 都打星了,對於這麼熱門的 item,可能也沒有推薦的必要了,因為其他 user 早晚也會自己發現的;如果有少數的 user 幾乎打星了一半以上的 item,這些 user 可能是屬於某種 web crawler 的用途或是這些 user 就是那種看到什麼就打星什麼的人,無論是哪一種,他們可能都不是你想要 modeling 的對象,可以考慮從 dataset 中拿掉。

實務上,如果你有關於 user 或 item 的黑名單,例如一些 SPAM 帳號或 NSFW 的內容等,也可以在這個步驟把它們過濾掉。

from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import Param
import pyspark.sql.functions as F

class DataCleaner(Transformer):

    @keyword_only
    def __init__(self, minItemStargazersCount=None, maxItemStargazersCount=None, minUserStarredCount=None, maxUserStarredCount=None):
        super(DataCleaner, self).__init__()
        self.minItemStargazersCount = Param(self, 'minItemStargazersCount', '移除 stargazer 數低於這個數字的 item')
        self.maxItemStargazersCount = Param(self, 'maxItemStargazersCount', '移除 stargazer 數超過這個數字的 item')
        self.minUserStarredCount = Param(self, 'minUserStarredCount', '移除 starred repo 數低於這個數字的 user')
        self.maxUserStarredCount = Param(self, 'maxUserStarredCount', '移除 starred repo 數超過這個數字的 user')
        self._setDefault(minItemStargazersCount=1, maxItemStargazersCount=50000, minUserStarredCount=1, maxUserStarredCount=50000)
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, minItemStargazersCount=None, maxItemStargazersCount=None, minUserStarredCount=None, maxUserStarredCount=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def setMinItemStargazersCount(self, value):
        self._paramMap[self.minItemStargazersCount] = value
        return self

    def getMinItemStargazersCount(self):
        return self.getOrDefault(self.minItemStargazersCount)

    def setMaxItemStargazersCount(self, value):
        self._paramMap[self.maxItemStargazersCount] = value
        return self

    def getMaxItemStargazersCount(self):
        return self.getOrDefault(self.maxItemStargazersCount)

    def setMinUserStarredCount(self, value):
        self._paramMap[self.minUserStarredCount] = value
        return self

    def getMinUserStarredCount(self):
        return self.getOrDefault(self.minUserStarredCount)

    def setMaxUserStarredCount(self, value):
        self._paramMap[self.maxUserStarredCount] = value
        return self

    def getMaxUserStarredCount(self):
        return self.getOrDefault(self.maxUserStarredCount)

    def _transform(self, ratingDF):
        minItemStargazersCount = self.getMinItemStargazersCount()
        maxItemStargazersCount = self.getMaxItemStargazersCount()
        minUserStarredCount = self.getMinUserStarredCount()
        maxUserStarredCount = self.getMaxUserStarredCount()

        toKeepItemsDF = ratingDF \
            .groupBy('item') \
            .agg(F.count('user').alias('stargazers_count')) \
            .where('stargazers_count >= {0} AND stargazers_count <= {1}'.format(minItemStargazersCount, maxItemStargazersCount)) \
            .orderBy('stargazers_count', ascending=False) \
            .select('item', 'stargazers_count')
        temp1DF = ratingDF.join(toKeepItemsDF, 'item', 'inner')

        toKeepUsersDF = temp1DF \
            .groupBy('user') \
            .agg(F.count('item').alias('starred_count')) \
            .where('starred_count >= {0} AND starred_count <= {1}'.format(minUserStarredCount, maxUserStarredCount)) \
            .orderBy('starred_count', ascending=False) \
            .select('user', 'starred_count')
        temp2DF = temp1DF.join(toKeepUsersDF, 'user', 'inner')

        cleanDF = temp2DF.select('user', 'item', 'rating', 'starred_at')
        return cleanDF

dataCleaner = DataCleaner(
    minItemStargazersCount=2,
    maxItemStargazersCount=4000,
    minUserStarredCount=2,
    maxUserStarredCount=5000
)
cleanDF = dataCleaner.transform(ratingDF)

cleanDF.agg(F.count('rating'), F.countDistinct('user'), F.countDistinct('item')).show()
# +-------------+--------------------+--------------------+
# |count(rating)|count(DISTINCT user)|count(DISTINCT item)|
# +-------------+--------------------+--------------------+
# |      2761118|               10472|              245626|
# +-------------+--------------------+--------------------+

Generate Negative Samples

對 implicit feedback 的 ALS 來說,手動加入負樣本(Rui = 0 的樣本)是沒有意義的,因為 missing value / non-observed value 對該演算法來說本來就是 0,表示用戶確實沒有對該物品做出行為,也就是 Pui = 0 沒有偏好,所以 Cui = 1 + alpha x 0 置信度也會比其他正樣本低。不過因為 Spark ML 的 ALS 只會計算 Rui > 0 的項目,所以即便你手動加入了 Rui = 0 或 Rui = -1 的負樣本,對整個模型其實沒有影響。

雖然沒有負樣本你就不能算 area under ROC curve 或是 area under Precision-Recall curve 等 binary classifier 用的指標,不過你可以改用 Learning to rank 的評估方式,例如 NDCG 或 Mean Average Precision 等。但是 ALS 的 loss function 也沒辦法直接優化 NDCG 這樣的指標就是了。

ref:
https://vinta.ws/code/generate-negative-samples-for-recommender-system.html

Split Data

因為 Matrix Factorization 需要考慮每個 user-item pair,如果你餵給 model 它沒見過的資料,它就沒辦法進行推薦(冷啟動問題)。只要 user 或 item 其中之一不存在於 dataset 裡,ALS model 所輸出的 prediction 值就會是 NaN。所以應該盡量保持每個 user 和 item 都出現在 training set 和 testing set 裡,例如隨機挑出每個 user 的任意 n 個或 n 比例的評分作為 test set,剩下的評分當作 training set(俗稱 leave-n-out)。如果使用 Machine Learning 中常見的 holdout 方式,隨機地把所有 data point 分散到 training set 和 test set(例如 df.randomSplit([0.7, 0.3])),會有很高的機率造成部分 user 或 item 只出現在其中一組 dataset 裡。

ref:
https://jessesw.com/Rec-System/
http://blog.ethanrosenthal.com/2016/10/19/implicit-mf-part-1/

從 LibRec 的文件上也可以發現還有許多拆分數據的方式,例如:

  • 基于 Ratio 的分类方法为通过给定的比例来将数据分为两部分。这个分类过程可以在所有数据中进行随机分类,也可以在用户或者物品维度上进行分类。当有时间的特征时,可以根据时间顺序留出最后一定比例的数据来进行测试。
  • LooCV 的分割方法为 leave-one-user/item/rating-out,也就是随机选取每个 user 的任意一个 item 或者每个 item 的任意一个 user 作为测试数据,余下的数据来作为训练数据。在实现中实现了基于 User 和基于 Item 的多种分类方式。
  • GivenN 分割方法是指为每个用户留出指定数目 N 的数据来作为测试用例,余下的样本作为训练数据。
  • KCV 即 K 折交叉验证。将数据分割为 K 份,在每次执行时选择其中一份作为测试数据,余下的数据作为训练数据,共执行 K 次。综合 K 次的训练结果来对推荐算法的性能进行评估。

ref:
https://www.librec.net/dokuwiki/doku.php?id=DataModel_zh#splitter

這裡我們用 sampleBy() 簡單地寫了一個根據 user 來隨機劃分 item 到 training set 和 test set 的方法。

def randomSplitByUser(df, weights, seed=None):
    trainingRation = weights[0]
    fractions = {row['user']: trainingRation for row in df.select('user').distinct().collect()}
    training = df.sampleBy('user', fractions, seed)
    testRDD = df.rdd.subtract(training.rdd)
    test = spark.createDataFrame(testRDD, df.schema)
    return training, test

training, test = randomSplitByUser(ratingDF, weights=[0.7, 0.3])

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.sampleBy

Train the Model

from pyspark.ml.recommendation import ALS

als = ALS(implicitPrefs=True, seed=42) \
    .setRank(50) \
    .setMaxIter(22) \
    .setRegParam(0.5) \
    .setAlpha(40)

alsModel = als.fit(training)

# 這些就是訓練出來的 user 和 item 的 Latent Factors
alsModel.userFactors.show()
alsModel.itemFactors.show()

ref:
https://spark.apache.org/docs/latest/ml-collaborative-filtering.html
https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS

Predict Preferences

from pyspark.ml import Transformer

predictedDF = alsModel.transform(testing)

class PredictionProcessor(Transformer):

    def _transform(self, predictedDF):
        nonNullDF = predictedDF.dropna(subset=['prediction', ])
        predictionDF = nonNullDF.withColumn('prediction', nonNullDF['prediction'].cast('double'))
        return predictionDF

# 刪掉那些 NaN 的數據
predictionProcessor = PredictionProcessor()
predictionDF = predictionProcessor.transform(predictedDF)

Evaluate the Model

因為 Spark ML 沒有提供給 DataFrame 用的 ranking evaluator,我們只好自己寫一個,但是內部還是使用 Spark MLlib 的 RankingMetrics。不過這個只是 offline 的評估方式而已,等到要實際上線的時候可能還需要做 A/B testing。

from pyspark import keyword_only
from pyspark.ml.evaluation import Evaluator
from pyspark.ml.param.shared import Param
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
import pyspark.sql.functions as F

class RankingEvaluator(Evaluator):

    @keyword_only
    def __init__(self, k=None):
        super(RankingEvaluator, self).__init__()
        self.k = Param(self, 'k', 'Top K')
        self._setDefault(k=30)
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, k=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def isLargerBetter(self):
        return True

    def setK(self, value):
        self._paramMap[self.k] = value
        return self

    def getK(self):
        return self.getOrDefault(self.k)

    def _evaluate(self, predictedDF):
        k = self.getK()

        predictedDF.show()

        windowSpec = Window.partitionBy('user').orderBy(col('prediction').desc())
        perUserPredictedItemsDF = predictedDF \
            .select('user', 'item', 'prediction', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        windowSpec = Window.partitionBy('user').orderBy(col('starred_at').desc())
        perUserActualItemsDF = predictedDF \
            .select('user', 'item', 'starred_at', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        perUserItemsRDD = perUserPredictedItemsDF.join(F.broadcast(perUserActualItemsDF), 'user', 'inner') \
            .rdd \
            .map(lambda row: (row[1], row[2]))

        if perUserItemsRDD.isEmpty():
            return 0.0

        rankingMetrics = RankingMetrics(perUserItemsRDD)
        metric = rankingMetrics.ndcgAt(k)
        return metric

k = 30
rankingEvaluator = RankingEvaluator(k=k)
ndcg = rankingEvaluator.evaluate(predictionDF)
print('NDCG', ndcg)

ref:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html

Recommend Items

實際感受一下推薦系統的效果如何。這裡是直接把結果 print 出來,而沒有把推薦結果儲存到資料庫。不過通常不會直接就把推薦系統輸出的東西展示給用戶,會先經過一些過濾、排序和產生推薦理由等等的步驟,或是加入一些人為的規則,比如說強制插入廣告、最近主打的商品或是過濾掉那些很多人點擊但是其實質量並不怎麼樣的東西。當然也有可能會把這個推薦系統的輸出作為其他機器學習 model 的輸入。

ref:
https://www.zhihu.com/question/28247353

def recommendItems(rawDF, alsModel, username, topN=30, excludeKnownItems=False):
    userID = rawDF \
        .where('from_username = "{0}"'.format(username)) \
        .select('from_user_id') \
        .take(1)[0]['from_user_id']

    userItemsDF = alsModel \
        .itemFactors. \
        selectExpr('{0} AS user'.format(userID), 'id AS item')
    if excludeKnownItems:
        userKnownItemsDF = rawDF \
            .where('from_user_id = {0}'.format(userID)) \
            .selectExpr('repo_id AS item')
        userItemsDF = userItemsDF.join(userKnownItemsDF, 'item', 'left_anti')

    userPredictedDF = alsModel \
        .transform(userItemsDF) \
        .select('item', 'prediction') \
        .orderBy('prediction', ascending=False) \
        .limit(topN)

    repoDF = rawDF \
        .groupBy('repo_id', 'repo_full_name', 'repo_language') \
        .agg(F.max('stargazers_count').alias('stargazers_count'))

    recommendedItemsDF = userPredictedDF \
        .join(repoDF, userPredictedDF['item'] == repoDF['repo_id'], 'inner') \
        .select('prediction', 'repo_full_name', 'repo_language', 'stargazers_count') \
        .orderBy('prediction', ascending=False)

    return recommendedItemsDF

k = 30
username = 'vinta'
recommendedItemsDF = recommendItems(rawDF, alsModel, username, topN=k, excludeKnownItems=False)
for item in recommendedItemsDF.collect():
    repoName = item['repo_full_name']
    repoUrl = 'https://github.com/{0}'.format(repoName)
    print(repoUrl, item['prediction'], item['repo_language'], item['stargazers_count'])

ref:
https://github.com/vinta/albedo/blob/master/src/main/python/train_als.ipynb

Cross-validate Models

使用 Spark ML 的 pipeline 來做 cross-validation,選出最適合的 hyperparameters 組合。

  • rank: The number of latent factors in the model, or equivalently, the number of columns k in the user-feature and product-feature matrices.
  • regParam: A standard overfitting parameter, also usually called lambda. Higher values resist overfitting, but values that are too high hurt the factorization’s accuracy.
  • alpha: Controls the relative weight of observed versus unobserved user-product interactions in the factorization.
  • maxIter: The number of iterations that the factorization runs. More iterations take more time but may produce a better factorization.
dataCleaner = DataCleaner()

als = ALS(implicitPrefs=True, seed=42)

predictionProcessor = PredictionProcessor()

pipeline = Pipeline(stages=[
    dataCleaner,
    als,
    predictionProcessor,
])

paramGrid = ParamGridBuilder() \
    .addGrid(dataCleaner.minItemStargazersCount, [1, 10, 100]) \
    .addGrid(dataCleaner.maxItemStargazersCount, [4000, ]) \
    .addGrid(dataCleaner.minUserStarredCount, [1, 10, 100]) \
    .addGrid(dataCleaner.maxUserStarredCount, [1000, 4000, ]) \
    .addGrid(als.rank, [50, 100]) \
    .addGrid(als.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(als.alpha, [0.01, 0.89, 1, 40, ]) \
    .addGrid(als.maxIter, [22, ]) \
    .build()

rankingEvaluator = RankingEvaluator(k=30)

cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=rankingEvaluator,
                    numFolds=2)

cvModel = cv.fit(ratingDF)

def printCrossValidationParameters(cvModel):
    metric_params_pairs = list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))
    metric_params_pairs.sort(key=lambda x: x[0], reverse=True)
    for pair in metric_params_pairs:
        metric, params = pair
        print('metric', metric)
        for k, v in params.items():
            print(k.name, v)
        print('')

printCrossValidationParameters(cvModel)

ref:
https://spark.apache.org/docs/latest/ml-pipeline.html

Generate negative samples for recommender system?

Generate negative samples for recommender system?

根據「推荐系统实践」,挑選負樣本時應該遵循以下原則:

  • 对每个用户,要保证正负样本的平衡(数目相似)。
  • 对每个用户采样负样本时,要选取那些很热门,而用户却没有行为的物品。
  • 一般认为,很热门而用户却没有行为更加代表用户对这个物品不感兴趣。因为对于冷门的物品,用户可能是压根没在网站中发现这个物品,所以谈不上是否感兴趣。

ref:
http://www.duokan.com/reader/www/app.html?id=ed873c9e323511e28a9300163e0123ac

不過如果你是用 Spark ML 的 ALS(implicitPrefs=True) 的話,並不需要手動加入負樣本。對 implicit feedback 的 ALS 來說,手動加入負樣本(Rui = 0 的樣本)是沒有意義的,因為 missing value / non-observed value 對該演算法來說本來就是 0,表示用戶確實沒有對該物品做出行為,也就是 Pui = 0 沒有偏好,所以 Cui = 1 + alpha x 0 置信度也會比其他正樣本低。不過因為 Spark ML 的 ALS 只會計算 Rui > 0 的項目,所以即便你手動加入了 Rui = 0 或 Rui = -1 的負樣本,對整個模型其實沒有影響。

用以下這三組 dataset 訓練出來的模型都是一樣的:

from pyspark.ml.recommendation import ALS

matrix = [
    (1, 1, 0),
    (1, 2, 1),
    (1, 3, 0),
    (1, 4, 1),
    (1, 5, 1),
    (2, 1, 1),
    (2, 2, 1),
    (2, 3, 0),
    (2, 4, 1),
    (2, 5, 1),
    (3, 1, 1),
    (3, 2, 1),
    (3, 3, 1),
    (3, 4, 1),
    (3, 5, 0),
]
df0 = spark.createDataFrame(matrix, ['user', 'item', 'rating'])

matrix = [
    (1, 1, -1),
    (1, 2, 1),
    (1, 3, -1),
    (1, 4, 1),
    (1, 5, 1),
    (2, 1, 1),
    (2, 2, 1),
    (2, 3, -1),
    (2, 4, 1),
    (2, 5, 1),
    (3, 1, 1),
    (3, 2, 1),
    (3, 3, 1),
    (3, 4, 1),
    (3, 5, -1),
]
df1 = spark.createDataFrame(matrix, ['user', 'item', 'rating'])

matrix = [
    (1, 2, 1),
    (1, 4, 1),
    (1, 5, 1),
    (2, 1, 1),
    (2, 2, 1),
    (2, 4, 1),
    (2, 5, 1),
    (3, 1, 1),
    (3, 2, 1),
    (3, 3, 1),
    (3, 4, 1),
]
df2 = spark.createDataFrame(matrix, ['user', 'item', 'rating'])

als = ALS(implicitPrefs=True, seed=42, nonnegative=False).setRank(7).setMaxIter(15).setRegParam(0.01).setAlpha(40)
alsModel = als.fit(df0)
alsModel.userFactors.select('features').show(truncate=False)
alsModel.itemFactors.select('features').show(truncate=False)

als = ALS(implicitPrefs=True, seed=42, nonnegative=False).setRank(7).setMaxIter(15).setRegParam(0.01).setAlpha(40)
alsModel = als.fit(df1)
alsModel.userFactors.select('features').show(truncate=False)
alsModel.itemFactors.select('features').show(truncate=False)

als = ALS(implicitPrefs=True, seed=42, nonnegative=False).setRank(7).setMaxIter(15).setRegParam(0.01).setAlpha(40)
alsModel = als.fit(df2)
alsModel.userFactors.select('features').show(truncate=False)
alsModel.itemFactors.select('features').show(truncate=False)

ref:
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L1626
https://github.com/apache/spark/commit/b05b3fd4bacff1d8b1edf4c710e7965abd2017a7
https://www.mail-archive.com/[email protected]/msg60240.html
http://apache-spark-user-list.1001560.n3.nabble.com/implicit-ALS-dataSet-td7067.html

Spark ML cookbook (Python)

Spark ML cookbook (Python)

Calculate percentage of sparsity of a user-item rating DataFrame

result = ratingDF.agg(F.count('rating'), F.countDistinct('user'), F.countDistinct('item')).collect()[0]
totalUserCount = result['count(DISTINCT user)']
totalItemCount = result['count(DISTINCT item)']
zonZeroRatingCount = result['count(rating)']

density = (zonZeroRatingCount / (totalUserCount * totalItemCount)) * 100
sparsity = 100 - density

Recommend items for single user

topN = 30
userID = 123

userItemsDF = alsModel \
    .itemFactors. \
    selectExpr('{0} AS user'.format(userID), 'id AS item')
userPredictedDF = alsModel \
    .transform(userItemsDF) \
    .select('item', 'prediction') \
    .orderBy('prediction', ascending=False) \
    .limit(topN)

ref:
https://www.safaribooksonline.com/library/view/advanced-analytics-with/9781491972946/ch03.html
https://www.safaribooksonline.com/library/view/building-a-recommendation/9781785282584/ch06s04.html
https://www.safaribooksonline.com/library/view/building-recommendation-engines/9781785884856/ch07s05.html

Recommend items for every user (a slightly fast way)

import numpy
from numpy import *

myModel = MatrixFactorizationModel.load(sc, 'BingBong')
m1 = myModel.productFeatures()
m2 = m1.map(lambda (product, feature): feature).collect()
m3 = matrix(m2).transpose()
pf = sc.broadcast(m3)
uf = myModel.userFeatures().coalesce(100)

# get predictions on all user
f1 = uf.map(lambda (userID, features): (userID, squeeze(asarray(matrix(array(features)) * pf.value))))

ref:
https://www.slideshare.net/SparkSummit/26-trillion-app-recomendations-using-100-lines-of-spark-code-ayman-farahat

Evaluate a binary classification

from pyspark.ml.evaluation import BinaryClassificationEvaluator

matrix = [
    (0.5, 1),
    (2.0, 1),
    (0.8, 1),
    (0.2, 0),
    (0.1, 0),
    (0.4, 0),
]
predictions = spark.createDataFrame(matrix, ['prediction', 'label'])
predictions = predictions.withColumn('prediction', predictions['prediction'].cast('double'))

evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
evaluator.evaluate(predictions)

Calculate ranking metrics

from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window
from pyspark.sql.functions import col, expr
import pyspark.sql.functions as F

k = 10

windowSpec = Window.partitionBy('user').orderBy(col('prediction').desc())
perUserPredictedItemsDF = outputDF \
    .select('user', 'item', 'prediction', F.rank().over(windowSpec).alias('rank')) \
    .where('rank <= {0}'.format(k)) \
    .groupBy('user') \
    .agg(expr('collect_list(item) as items'))
perUserPredictedItemsDF.show()
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[36560369, 197450...|
# |   47217|[40693501, 643554...|
# +--------+--------------------+

windowSpec = Window.partitionBy('from_user_id').orderBy(col('starred_at').desc())
perUserActualItemsDF = rawDF \
    .select('from_user_id', 'repo_id', 'starred_at', F.rank().over(windowSpec).alias('rank')) \
    .where('rank <= {0}'.format(k)) \
    .groupBy('from_user_id') \
    .agg(expr('collect_list(repo_id) as items')) \
    .withColumnRenamed('from_user_id', 'user')
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[29122050, 634846...|
# |   59990|[9820191, 8729416...|
# +--------+--------------------+

perUserItemsRDD = perUserPredictedItemsDF.join(perUserActualItemsDF, 'user') \
    .rdd \
    .map(lambda row: (row[1], row[2]))
rankingMetrics = RankingMetrics(perUserItemsRDD)

print(rankingMetrics.meanAveragePrecision)
print(rankingMetrics.precisionAt(k))
print(rankingMetrics.ndcgAt(k))

ref:
https://www.safaribooksonline.com/library/view/spark-the-definitive/9781491912201/ch19.html

Create a custom Transformer

from pyspark.ml import Transformer

class PredictionProcessor(Transformer):

    def _transform(self, predictedDF):
        nonNullDF = predictedDF.dropna(subset=['prediction', ])
        predictionDF = nonNullDF.withColumn('prediction', nonNullDF['prediction'].cast('double'))
        return predictionDF

ref:
http://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml

RankingMetrics

結果介於 0 ~ 1 之間,分數越大越好。

from pyspark.mllib.evaluation import RankingMetrics

predictionAndLabels = sc.parallelize([
    ([1, 2, 3, 4], [1, 2, 3, 4]),
    ([1, ], [1, 10]),
    ([6, 4, 2], [6, 2, 100, 8, 2, 55]),
])
metrics = RankingMetrics(predictionAndLabels)
metrics.ndcgAt(5)

ref:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems

Create a custom Evaluator

from pyspark import keyword_only
from pyspark.ml.evaluation import Evaluator
from pyspark.ml.param.shared import Param
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
import pyspark.sql.functions as F

class RankingEvaluator(Evaluator):

    @keyword_only
    def __init__(self, k=None):
        super(RankingEvaluator, self).__init__()
        self.k = Param(self, 'k', 'Top K')
        self._setDefault(k=30)
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, k=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def isLargerBetter(self):
        return True

    def setK(self, value):
        self._paramMap[self.k] = value
        return self

    def getK(self):
        return self.getOrDefault(self.k)

    def _evaluate(self, outputDF):
        k = self.getK()

        windowSpec = Window.partitionBy('user').orderBy(col('prediction').desc())
        perUserPredictedItemsDF = outputDF \
            .select('user', 'item', 'prediction', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        windowSpec = Window.partitionBy('user').orderBy(col('starred_at').desc())
        perUserActualItemsDF = outputDF \
            .select('user', 'item', 'starred_at', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        perUserItemsRDD = perUserPredictedItemsDF.join(F.broadcast(perUserActualItemsDF), 'user', 'inner') \
            .rdd \
            .map(lambda row: (row[1], row[2]))
        rankingMetrics = RankingMetrics(perUserItemsRDD)
        metric = rankingMetrics.ndcgAt(k)
        return metric

Show best parameters from a CrossValidatorModel

metric_params_pairs = list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))
metric_params_pairs.sort(key=lambda x: x[0], reverse=True)
best_metric_params = metric_params_pairs[0][1]
for pair in metric_params_pairs:
    metric, params = pair
    print('metric', metric)
    for k, v in params.items():
        print(k.name, v)
    print('')
# metric 0.5273636632856705
# rank 50
# regParam 0.01
# maxIter 10
# alpha 1

ref:
https://stackoverflow.com/questions/31749593/how-to-extract-best-parameters-from-a-crossvalidatormodel
https://stackoverflow.com/questions/39529012/pyspark-get-all-parameters-of-models-created-with-paramgridbuilder

Show best parameters from a TrainValidationSplitModel

metric_params_pairs = list(zip(tvModel.validationMetrics, tvModel.getEstimatorParamMaps()))
metric_params_pairs.sort(key=lambda x: x[0], reverse=True)
for pair in metric_params_pairs:
    metric, params = pair
    print('metric', metric)
    for k, v in params.items():
        print(k.name, v)
    print('')
# metric 0.5385481418189484
# rank 50
# regParam 0.1
# maxIter 20
# alpha 1
Spark SQL cookbook (Python)

Spark SQL cookbook (Python)

Access SparkSession

from pyspark.sql import SparkSession

# get the default SparkSession instance
spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext
sc.setLogLevel('INFO')

Access custom configurations

$ spark-submit \
--master spark://localhost:7077 \
--properties-file my-properties.conf \
your_spark_app.py

# in my-properties.conf
# spark.myapp.db_host 192.168.100.10
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
print(sc.getConf().get('spark.myapp.db_host'))

ref:
https://stackoverflow.com/questions/31115881/how-to-load-java-properties-file-and-use-in-spark

Create a RDD

array = [
    1,
    5,
    7,
    2,
    3,
]
rdd = sc.parallelize(array)
df = rdd.toDF('int')
# +-----+
# |value|
# +-----+
# |    1|
# |    5|
# |    7|
# |    2|
# |    3|
# +-----+

Read data from MySQL

$ pyspark --packages "mysql:mysql-connector-java:5.1.41"
url = 'jdbc:mysql://127.0.0.1:3306/albedo'
properties = {'user': 'root', 'password': '123'}
spark.read.jdbc(url, 'app_repostarring', properties=properties)

ref:
https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-configuration-properties.html

Write data to MySQL

CREATE TABLE `recommender_songrecommend` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) NOT NULL,
  `song_id` int(11) NOT NULL,
  `score` double NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1;
matrix = [
    (1, 2, 0.1),
    (3, 4, 0.23456),
    (5, 6, 7.89),
    (7, 8, -10.111213),
]
df = spark.createDataFrame(matrix, ['user_id','song_id', 'score'])

url = 'jdbc:mysql://192.168.11.200:3306/DATABASE?user=USER&password=PASSWORD&verifyServerCertificate=false&useSSL=false&rewriteBatchedStatements=true'
properties = {'driver': 'com.mysql.jdbc.Driver'}
df \
    .selectExpr('user AS user_id', 'item AS song_id', 'score') \
    .write.jdbc(url, table='recommender_songrecommend', mode='append', properties=properties)

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.jdbc
https://stackoverflow.com/questions/2993251/jdbc-batch-insert-performance/10617768#10617768

Read data from SQLite

$ pyspark --packages "org.xerial:sqlite-jdbc:3.16.1"
from pyspark.sql.functions import lit
from pyspark.sql.types import IntegerType, StructField, StructType, TimestampType

props = {'driver': 'org.sqlite.JDBC', 'date_string_format': 'yyyy-MM-dd HH:mm:ss'}
df = spark.read.jdbc("jdbc:sqlite:db.sqlite3", "app_repostarring", properties=props)

df = df.where(df.stargazers_count >= min_stargazers_count)
df = df.select('from_user_id', 'repo_id', 'created_at')
df = df.toDF('user', 'item') 
df = df.withColumn('rating', lit(1))

schema = StructType([
    StructField('user', IntegerType(), nullable=False),
    StructField('item', IntegerType(), nullable=False),
    StructField('rating', IntegerType(), nullable=False),
    StructField('item_created_at', TimestampType(), nullable=False),
])
df = spark.createDataFrame(df.rdd, schema)

ref:
https://github.com/xerial/sqlite-jdbc

java.text.ParseException: Unparseable date: "2016-04-22 17:26:54"
https://github.com/xerial/sqlite-jdbc/issues/88

Read data from parquet

from pyspark.sql.utils import AnalysisException

raw_df_filepath = 'raw_df.parquet'

try:
    raw_df = spark.read.format('parquet').load(raw_df_filepath)
except AnalysisException as exc:
    if 'Path does not exist' in exc.desc:
        raw_df = load_raw_data()
        raw_df.write.format('parquet').save(raw_df_filepath)
    else:
        raise exc

ref:
https://community.hortonworks.com/articles/21303/write-read-parquet-file-in-spark.html

Create a DataFrame

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (2, 1, 0),
    (3, 1, 1),
    (3, 3, 1),
    (3, 4, 1),
    (4, 1, 0),
    (4, 2, 0),
    (5, 9, 1),
    (5, 5, 0),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'rating'])

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession.createDataFrame

Create a DataFrame with explicit schema

from pyspark.sql.types import *

matrix = [
    ('Alice', 0.5, 5.0),
    ('Bob',   0.2, 92.0),
    ('Tom',   0.0, 122.0),
    ('Zack',  0.1, 1.0),
]
schema = StructType([
    StructField('name', StringType(), nullable=False),
    StructField('prediction', DoubleType(), nullable=False),
    StructField('rating', DoubleType(), nullable=False)
])
df = spark.createDataFrame(matrix, schema)
df.printSchema()

new_df = spark.createDataFrame(someRDD, df.schema)

ref:
https://github.com/awesome-spark/spark-gotchas/blob/master/05_spark_sql_and_dataset_api.md

Create a nested schema

from pyspark.sql.types import ArrayType
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType

recommendation_schema = StructType([
    StructField('item', IntegerType(), nullable=False),
    StructField('rating', FloatType(), nullable=False),
])

user_recommendations_schema = StructType([
    StructField('user', IntegerType(), nullable=False),
    StructField('recommendations', ArrayType(recommendation_schema), nullable=False),
])

matrix = [
    (1, [[100, 1.0], [200, 2.0]]),
    (2, [[300, 3.0]]),
]
df = spark.createDataFrame(matrix, user_recommendations_schema)

df.printSchema()
# root
 # |-- user: integer (nullable = false)
 # |-- recommendations: array (nullable = false)
 # |    |-- element: struct (containsNull = true)
 # |    |    |-- item: integer (nullable = false)
 # |    |    |-- rating: float (nullable = false)
 # |-- recommendations_count: integer (nullable = false)

df.show()
# +----+--------------------+---------------------+
# |user|     recommendations|recommendations_count|
# +----+--------------------+---------------------+
# |   1|[[100,1.0], [200,...|                    2|
# |   2|         [[300,3.0]]|                    1|
# +----+--------------------+---------------------+

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types

Change schema of a DataFrame

from pyspark.sql.types import DoubleType, IntegerType, StructField, StructType

schema = StructType([
    StructField('user', IntegerType(), nullable=False),
    StructField('item', IntegerType(), nullable=False),
    StructField('rating', DoubleType(), nullable=False),
])
df = spark.createDataFrame(df.rdd, schema)

Get numbers of partitions

df.rdd.getNumPartitions()

Split a DataFrame into chunks (partitions)

def write_to_db(partial_rows):
    values_tuples = [(row.user, row.item, row.score, year, week) for row in chunk_rows]
    result = your_mysql_insert_func(values_tuples)
    return [result, ]

save_result = df \
    .repartition(400) \
    .rdd \
    .mapPartitions(write_to_db) \
    .collect()

ref:
https://stackoverflow.com/questions/24898700/batching-within-an-apache-spark-rdd-map
https://stackoverflow.com/questions/35370826/using-spark-for-sequential-row-by-row-processing-without-map-and-reduce

Show a DataFrame

# print the schema in a tree format
df.printSchema()

# print top 20 rows
df.show()

# print top 100 rows
df.show(100)

# calculate the descriptive statistics
df.describe().show()
df.describe(['like_count', ]).show()

ref:
http://spark.apache.org/docs/latest/sql-programming-guide.html#untyped-dataset-operations-aka-dataframe-operations

Create a column with a literal value

from pyspark.sql.functions import lit

df = df.withColumn('like', lit(1))

ref:
http://stackoverflow.com/questions/33681487/how-do-i-add-a-new-column-to-a-spark-dataframe-using-pyspark

Return a fraction of a DataFrame

fraction = {
    'u1': 0.5,
    'u2': 0.5,
    'u3': 0.5,
    'u4': 0.5,
    'u5': 0.5,
}
df.sampleBy('user', fraction).show()
# +----+----+----------+
# |user|item|prediction|
# +----+----+----------+
# |  u1|  i1|         1|
# |  u3|  i4|         4|
# |  u4|  i1|         1|
# |  u4|  i2|         3|
# |  u5|  i5|         5|
# +----+----+----------+

Show distinct values of a column

df.select('user').distinct().show()

ref:
https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/

Rename columns

df.printSchema()
# root
# |-- from_user_id: integer (nullable = true)
 # |-- repo_id: integer (nullable = true)

df = df.toDF('user', 'item')
# or
df = df.withColumnRenamed('from_user_id', 'user').withColumnRenamed('repo_id', 'item')

Convert a column to double type

df = df.withColumn('prediction', df['prediction'].cast('double'))

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.cast

Update a colume based on conditions

from pyspark.sql.functions import when

df = df.withColumn('label', when(df['prediction'] > 0.5, 1).otherwise(0))

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.when
http://stackoverflow.com/questions/34908448/spark-add-column-to-dataframe-conditionally

Drop columns from a DataFrame

predictions = predictions.dropna(subset=['prediction', S])

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropna

DataFrame subtract another DataFrame

matrix1 = [
    (1, 2, 123),
    (3, 4, 1),
    (5, 6, 45),
    (7, 8, 3424),
]
df1 = spark.createDataFrame(matrix1, ['user','item', 'play_count'])

matrix2 = [
    (3, 4),
    (5, 6),
    (9, 1),
    (7, 8),
    (1, 6),
    (1, 1),
]
df2 = spark.createDataFrame(matrix2, ['user','item'])

df2.subtract(df1.select('user', 'item')).show()
# +----+----+
# |user|item|
# +----+----+
# |   1|   1|
# |   1|   6|
# |   9|   1|
# +----+----+

# or

testing_rdd = df.rdd.subtract(training.rdd)
testing = spark.createDataFrame(testing_rdd, df.schema)

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.subtract

Convert a DataFrame column into a Python list

# list
popluar_items = [row['item'] for row in popular_df.select('item').collect()]

# set
all_items = {row.id for row in als_model.itemFactors.select('id').collect()}

Concatenate (merge) two DataFrames

full_df = df1.union(df2)

Convert a DataFrame to a Python dict

d = df.rdd.collectAsMap()
d['some_key']

Compute (approximate or exact) median of a numerical column

approximate_median = df.approxQuantile('count', [0.5, ], 0.25)
exact_median = df.approxQuantile('count', [0.5, ], 0.0)

maximum = df.approxQuantile('count', [1.0, ], 0.1)
minimum = df.approxQuantile('count', [0.0, ], 0.1)

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.approxQuantile

Find frequent items for columns

df = rating_df.freqItems(['item', ], support=0.01)
# +--------------------+
# |      item_freqItems|
# +--------------------+
# |[194512, 371798, ...|
# +--------------------+

# get the value of a DataFrame column
popular_items = df.collect()[0].item_freqItems

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.freqItems

Broadcast a value

bc_candidates = sc.broadcast(set([1, 2, 4, 5, 8]))
print(bc_candidates.value)
# {8, 1, 2, 4, 5}

bc_df = sc.broadcast(df.collect())
df = spark.createDataFrame(bc_df.value)

ref:
http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

Broadcast a DataFrame in join

import pyspark.sql.functions as F

large_df.join(F.broadcast(small_df), 'some_key')

ref:
http://stackoverflow.com/questions/34053302/pyspark-and-broadcast-join-example
https://chapeau.freevariable.com/2014/09/improving-spark-application-performance.html

Cache a DataFrame

df.cache()

ref:
http://stackoverflow.com/questions/38056774/spark-cache-vs-broadcast

Show query execution plan

df.explain(extended=True)

Use SQL to query a DataFrame

props = {'driver': 'org.sqlite.JDBC', 'date_string_format': 'yyyy-MM-dd HH:mm:ss'}
df = spark.read.jdbc("jdbc:sqlite:db.sqlite3", "app_repostarring", properties=props)
df.createOrReplaceTempView('repo_stars')

query = 'SELECT DISTINCT repo_id AS item FROM repo_stars WHERE stargazers_count > 1000'
df2 = spark.sql(query)
df2.show()

query = """
SELECT
    from_user_id AS user,
    count(repo_id) AS count
FROM repo_stars
GROUP BY from_user_id
ORDER BY count DESC
"""
df = spark.sql(query)

params = {'top_n': top_n}
query = """
SELECT 
    repo_id AS item,
    MAX(stargazers_count) AS stars
FROM repo_stars
GROUP BY repo_id
ORDER BY stars DESC
LIMIT {top_n}
""".format(**params)
popular_df = spark.sql(query)

ref:
https://sparkour.urizone.net/recipes/using-sql-udf/

WHERE ... IN ...

from pyspark.sql.functions import col

item_ids = [1, 2, 5, 8]
raw_df \
    .where(col('repo_id').isin(item_ids)) \
    .select('repo_url') \
    .collectAsMap()

ORDER BY multiple columns

import pyspark.sql.functions as F

rating_df = raw_df \
    .selectExpr('from_user_id AS user', 'repo_id AS item', '1 AS rating', 'starred_at') \
    .orderBy('user', F.col('starred_at').desc())

Aggregate

df.agg(min('user'), max('user'), min('item'), max('item')).show()

max_value = user_star_count_df.agg(F.max('stars')).collect()[0]["max('stars')"]

SELECT COUNT(DISTINCT xxx) ...

import pyspark.sql.functions as F

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (2, 1, 0),
    (3, 1, 1),
    (3, 3, 1),
    (3, 4, 1),
    (4, 1, 0),
    (4, 2, 0),
    (5, 9, 1),
    (5, 5, 0),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'rating'])
df.agg(F.countDistinct('user')).show()
# +--------------------+
# |count(DISTINCT user)|
# +--------------------+
# |                   5|
# +--------------------+

ref:
http://stackoverflow.com/questions/40888946/spark-dataframe-count-distinct-values-of-every-column

SELECT MAX(xxx) ... GROUP BY

df.groupBy('user').count().filter('count >= 4').show()

popular_df = raw_df \
    .groupBy('repo_id') \
    .agg(F.max('stargazers_count').alias('stars')) \
    .orderBy('stars', ascending=False) \
    .limit(top_n)

ref:
http://stackoverflow.com/questions/30616380/spark-how-to-count-number-of-records-by-key

SELECT COUNT() ... GROUP BY

prediction_df.groupBy('user').count().show()
# +------+-----+
# |  user|count|
# +------+-----+
# |649661|   15|
# |464340|   15|
# |468780|   15|
# |489233|   11|
# |455143|   14|
# +------+-----+

stargazers_count_df = rating_df \
    .groupBy('item') \
    .agg(F.count('user').alias('stargazers_count')) \
    .orderBy('stargazers_count', ascending=False)
# +--------+----------------+
# |    item|stargazers_count|
# +--------+----------------+
# | 3544424|             137|
# | 2126244|             120|
# | 7691631|             115|
# | 1362490|             112|
# |  943149|             112|
# +--------+----------------+

starred_count_df = rating_df \
    .groupBy('user') \
    .agg(F.count('item').alias('starred_count')) \
    .orderBy('starred_count', ascending=False)
# +-------+-------------+
# |   user|starred_count|
# +-------+-------------+
# |  48936|         7201|
# |4560482|         5898|
# |3382565|         3978|
# |  10652|         3586|
# |  31996|         3459|
# +-------+-------------+

You may want to use approx_count_distinct.

GROUP_CONCAT a column

from pyspark.sql.functions import expr

per_user_predictions_df = output_df \
  .orderBy(['user', 'prediction'], ascending=False) \
  .groupBy('user') \
  .agg(expr('collect_list(item) as items'))
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[36560369, 197450...|
# |   47217|[40693501, 643554...|
# +--------+--------------------+

ref:
http://stackoverflow.com/questions/31640729/spark-sql-replacement-for-mysql-group-concat-aggregate-function

GROUP_CONCAT multiple columns

from pyspark.sql.functions import col, collect_list, struct

matrix = [
    (1, 1, 0.1),
    (1, 2, 5.1),
    (1, 6, 0.0),
    (2, 6, 9.3),
    (3, 1, 0.54),
    (3, 5, 0.83),
    (4, 1, 0.65),
    (4, 4, 1.023),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'prediction'])

df \
    .groupBy("user") \
    .agg(collect_list(struct(col('item'), col('prediction'))).alias("recommendations")) \
    .show(truncate=False)
# +----+---------------------------+
# |user|recommendations            |
# +----+---------------------------+
# |1   |[[1,0.1], [2,5.1], [6,0.0]]|
# |3   |[[1,0.54], [5,0.83]]       |
# |2   |[[6,9.3]]                  |
# |4   |[[1,0.65], [4,1.023]]      |
# +----+---------------------------+

ref:
https://stackoverflow.com/questions/37737843/aggregating-multiple-columns-with-custom-function-in-spark

SELECT ... RANK() OVER (PARTITION BY ... ORDER BY)

from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
import pyspark.sql.functions as F

window_spec = Window.partitionBy('from_user_id').orderBy(col('starred_at').desc())
per_user_actual_items_df = raw_df \
    .select('from_user_id', 'repo_id', 'starred_at', F.rank().over(window_spec).alias('rank')) \
    .where('rank <= 10') \
    .groupBy('from_user_id') \
    .agg(expr('collect_list(repo_id) as items')) \
    .withColumnRenamed('from_user_id', 'user')
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[29122050, 634846...|
# |   59990|[9820191, 8729416...|
# +--------+--------------------+

window_spec = Window.partitionBy('user').orderBy(col('prediction').desc())
per_user_predicted_items_df = output_df \
    .select('user', 'item', 'prediction', F.rank().over(window_spec).alias('rank')) \
    .where('rank <= 10') \
    .groupBy('user') \
    .agg(expr('collect_list(item) as items'))
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[36560369, 197450...|
# |   47217|[40693501, 643554...|
# +--------+--------------------+

ref:
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
https://github.com/awesome-spark/spark-gotchas/blob/master/05_spark_sql_and_dataset_api.md#window-functions

Left anti join / Left excluding join

clean_df = rating_df.join(to_remove_items, 'item', 'left_anti')

ref:
https://www.codeproject.com/Articles/33052/Visual-Representation-of-SQL-Joins

Outer join

m1 = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 4, 1),
    (2, 2, 1),
    (2, 3, 1),
    (3, 5, 1),
]
m1df = spark.createDataFrame(m1, ['user', 'item', 'rating'])
m1df = m1df.where('user = 1').alias('m1df')

m2 = [
    (1, 100),
    (2, 200),
    (3, 300),
    (4, 400),
    (5, 500),
    (6, 600),
]
m2df = spark.createDataFrame(m2, ['item', 'count'])
m2df = m2df.alias('m2df')

m1df.join(m2df, m1df.item == m2df.item, 'rightouter') \
.where('m1df.user IS NULL') \
.orderBy('m2df.count', ascending=False) \
.selectExpr('1 AS user', 'm2df.item', '0 AS rating') \
.show()
# +----+----+------+
# |user|item|rating|
# +----+----+------+
# |   1|   6|     0|
# |   1|   5|     0|
# |   1|   3|     0|
# +----+----+------+

ref:
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-joins.html

Cross join

df = m1df.select('user').distinct().crossJoin(m2df.select('item'))
df.show()

query = """
SELECT f1.user, f2.item, 0 AS rating
FROM f1
CROSS JOIN f2
"""
df = spark.sql(query)
df.show()

all_user_item_pair_df = als_model.userFactors.selectExpr('id AS user') \
    .crossJoin(alsModel.itemFactors.selectExpr('id AS item'))
# +----+----+
# |user|item|
# +----+----+
# |xxxx|oooo|
# +----+----+

ref:
http://stackoverflow.com/questions/5464131/finding-pairs-that-do-not-exist-in-a-different-table