Spark ML cookbook (Python)

Calculate percentage of sparsity of a user-item rating DataFrame

result = ratingDF.agg(F.count('rating'), F.countDistinct('user'), F.countDistinct('item')).collect()[0]
totalUserCount = result['count(DISTINCT user)']
totalItemCount = result['count(DISTINCT item)']
zonZeroRatingCount = result['count(rating)']

density = (zonZeroRatingCount / (totalUserCount * totalItemCount)) * 100
sparsity = 100 - density

Recommend items for single user

topN = 30
userID = 123

userItemsDF = alsModel \
    .itemFactors. \
    selectExpr('{0} AS user'.format(userID), 'id AS item')
userPredictedDF = alsModel \
    .transform(userItemsDF) \
    .select('item', 'prediction') \
    .orderBy('prediction', ascending=False) \
    .limit(topN)

ref:
https://www.safaribooksonline.com/library/view/advanced-analytics-with/9781491972946/ch03.html
https://www.safaribooksonline.com/library/view/building-a-recommendation/9781785282584/ch06s04.html
https://www.safaribooksonline.com/library/view/building-recommendation-engines/9781785884856/ch07s05.html

Recommend items for every user (a slightly fast way)

import numpy
from numpy import *

myModel = MatrixFactorizationModel.load(sc, 'BingBong')
m1 = myModel.productFeatures()
m2 = m1.map(lambda (product, feature): feature).collect()
m3 = matrix(m2).transpose()
pf = sc.broadcast(m3)
uf = myModel.userFeatures().coalesce(100)

# get predictions on all user
f1 = uf.map(lambda (userID, features): (userID, squeeze(asarray(matrix(array(features)) * pf.value))))

ref:
https://www.slideshare.net/SparkSummit/26-trillion-app-recomendations-using-100-lines-of-spark-code-ayman-farahat

Evaluate a binary classification

from pyspark.ml.evaluation import BinaryClassificationEvaluator

matrix = [
    (0.5, 1),
    (2.0, 1),
    (0.8, 1),
    (0.2, 0),
    (0.1, 0),
    (0.4, 0),
]
predictions = spark.createDataFrame(matrix, ['prediction', 'label'])
predictions = predictions.withColumn('prediction', predictions['prediction'].cast('double'))

evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
evaluator.evaluate(predictions)

Calculate ranking metrics

from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window
from pyspark.sql.functions import col, expr
import pyspark.sql.functions as F

k = 10

windowSpec = Window.partitionBy('user').orderBy(col('prediction').desc())
perUserPredictedItemsDF = outputDF \
    .select('user', 'item', 'prediction', F.rank().over(windowSpec).alias('rank')) \
    .where('rank <= {0}'.format(k)) \
    .groupBy('user') \
    .agg(expr('collect_list(item) as items'))
perUserPredictedItemsDF.show()
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[36560369, 197450...|
# |   47217|[40693501, 643554...|
# +--------+--------------------+

windowSpec = Window.partitionBy('from_user_id').orderBy(col('starred_at').desc())
perUserActualItemsDF = rawDF \
    .select('from_user_id', 'repo_id', 'starred_at', F.rank().over(windowSpec).alias('rank')) \
    .where('rank <= {0}'.format(k)) \
    .groupBy('from_user_id') \
    .agg(expr('collect_list(repo_id) as items')) \
    .withColumnRenamed('from_user_id', 'user')
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[29122050, 634846...|
# |   59990|[9820191, 8729416...|
# +--------+--------------------+

perUserItemsRDD = perUserPredictedItemsDF.join(perUserActualItemsDF, 'user') \
    .rdd \
    .map(lambda row: (row[1], row[2]))
rankingMetrics = RankingMetrics(perUserItemsRDD)

print(rankingMetrics.meanAveragePrecision)
print(rankingMetrics.precisionAt(k))
print(rankingMetrics.ndcgAt(k))

ref:
https://www.safaribooksonline.com/library/view/spark-the-definitive/9781491912201/ch19.html

Create a custom Transformer

from pyspark.ml import Transformer

class PredictionProcessor(Transformer):

    def _transform(self, predictedDF):
        nonNullDF = predictedDF.dropna(subset=['prediction', ])
        predictionDF = nonNullDF.withColumn('prediction', nonNullDF['prediction'].cast('double'))
        return predictionDF

ref:
http://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml

RankingMetrics

結果介於 0 ~ 1 之間,分數越大越好。

from pyspark.mllib.evaluation import RankingMetrics

predictionAndLabels = sc.parallelize([
    ([1, 2, 3, 4], [1, 2, 3, 4]),
    ([1, ], [1, 10]),
    ([6, 4, 2], [6, 2, 100, 8, 2, 55]),
])
metrics = RankingMetrics(predictionAndLabels)
metrics.ndcgAt(5)

ref:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems

Create a custom Evaluator

from pyspark import keyword_only
from pyspark.ml.evaluation import Evaluator
from pyspark.ml.param.shared import Param
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
import pyspark.sql.functions as F

class RankingEvaluator(Evaluator):

    @keyword_only
    def __init__(self, k=None):
        super(RankingEvaluator, self).__init__()
        self.k = Param(self, 'k', 'Top K')
        self._setDefault(k=30)
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, k=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def isLargerBetter(self):
        return True

    def setK(self, value):
        self._paramMap[self.k] = value
        return self

    def getK(self):
        return self.getOrDefault(self.k)

    def _evaluate(self, outputDF):
        k = self.getK()

        windowSpec = Window.partitionBy('user').orderBy(col('prediction').desc())
        perUserPredictedItemsDF = outputDF \
            .select('user', 'item', 'prediction', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        windowSpec = Window.partitionBy('user').orderBy(col('starred_at').desc())
        perUserActualItemsDF = outputDF \
            .select('user', 'item', 'starred_at', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        perUserItemsRDD = perUserPredictedItemsDF.join(F.broadcast(perUserActualItemsDF), 'user', 'inner') \
            .rdd \
            .map(lambda row: (row[1], row[2]))
        rankingMetrics = RankingMetrics(perUserItemsRDD)
        metric = rankingMetrics.ndcgAt(k)
        return metric

Show best parameters from a CrossValidatorModel

metric_params_pairs = list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))
metric_params_pairs.sort(key=lambda x: x[0], reverse=True)
best_metric_params = metric_params_pairs[0][1]
for pair in metric_params_pairs:
    metric, params = pair
    print('metric', metric)
    for k, v in params.items():
        print(k.name, v)
    print('')
# metric 0.5273636632856705
# rank 50
# regParam 0.01
# maxIter 10
# alpha 1

ref:
https://stackoverflow.com/questions/31749593/how-to-extract-best-parameters-from-a-crossvalidatormodel
https://stackoverflow.com/questions/39529012/pyspark-get-all-parameters-of-models-created-with-paramgridbuilder

Show best parameters from a TrainValidationSplitModel

metric_params_pairs = list(zip(tvModel.validationMetrics, tvModel.getEstimatorParamMaps()))
metric_params_pairs.sort(key=lambda x: x[0], reverse=True)
for pair in metric_params_pairs:
    metric, params = pair
    print('metric', metric)
    for k, v in params.items():
        print(k.name, v)
    print('')
# metric 0.5385481418189484
# rank 50
# regParam 0.1
# maxIter 20
# alpha 1