Calculate percentage of sparsity of a user-item rating DataFrame
result = ratingDF.agg(F.count('rating'), F.countDistinct('user'), F.countDistinct('item')).collect()[0]
totalUserCount = result['count(DISTINCT user)']
totalItemCount = result['count(DISTINCT item)']
zonZeroRatingCount = result['count(rating)']
density = (zonZeroRatingCount / (totalUserCount * totalItemCount)) * 100
sparsity = 100 - density
Recommend items for single user
topN = 30
userID = 123
userItemsDF = alsModel \
.itemFactors. \
selectExpr('{0} AS user'.format(userID), 'id AS item')
userPredictedDF = alsModel \
.transform(userItemsDF) \
.select('item', 'prediction') \
.orderBy('prediction', ascending=False) \
.limit(topN)
ref:
https://www.safaribooksonline.com/library/view/advanced-analytics-with/9781491972946/ch03.html
https://www.safaribooksonline.com/library/view/building-a-recommendation/9781785282584/ch06s04.html
https://www.safaribooksonline.com/library/view/building-recommendation-engines/9781785884856/ch07s05.html
Recommend items for every user (a slightly fast way)
import numpy
from numpy import *
myModel = MatrixFactorizationModel.load(sc, 'BingBong')
m1 = myModel.productFeatures()
m2 = m1.map(lambda (product, feature): feature).collect()
m3 = matrix(m2).transpose()
pf = sc.broadcast(m3)
uf = myModel.userFeatures().coalesce(100)
# get predictions on all user
f1 = uf.map(lambda (userID, features): (userID, squeeze(asarray(matrix(array(features)) * pf.value))))
Evaluate a binary classification
from pyspark.ml.evaluation import BinaryClassificationEvaluator
matrix = [
(0.5, 1),
(2.0, 1),
(0.8, 1),
(0.2, 0),
(0.1, 0),
(0.4, 0),
]
predictions = spark.createDataFrame(matrix, ['prediction', 'label'])
predictions = predictions.withColumn('prediction', predictions['prediction'].cast('double'))
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
evaluator.evaluate(predictions)
Calculate ranking metrics
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window
from pyspark.sql.functions import col, expr
import pyspark.sql.functions as F
k = 10
windowSpec = Window.partitionBy('user').orderBy(col('prediction').desc())
perUserPredictedItemsDF = outputDF \
.select('user', 'item', 'prediction', F.rank().over(windowSpec).alias('rank')) \
.where('rank <= {0}'.format(k)) \
.groupBy('user') \
.agg(expr('collect_list(item) as items'))
perUserPredictedItemsDF.show()
# +--------+--------------------+
# | user| items|
# +--------+--------------------+
# | 2142|[36560369, 197450...|
# | 47217|[40693501, 643554...|
# +--------+--------------------+
windowSpec = Window.partitionBy('from_user_id').orderBy(col('starred_at').desc())
perUserActualItemsDF = rawDF \
.select('from_user_id', 'repo_id', 'starred_at', F.rank().over(windowSpec).alias('rank')) \
.where('rank <= {0}'.format(k)) \
.groupBy('from_user_id') \
.agg(expr('collect_list(repo_id) as items')) \
.withColumnRenamed('from_user_id', 'user')
# +--------+--------------------+
# | user| items|
# +--------+--------------------+
# | 2142|[29122050, 634846...|
# | 59990|[9820191, 8729416...|
# +--------+--------------------+
perUserItemsRDD = perUserPredictedItemsDF.join(perUserActualItemsDF, 'user') \
.rdd \
.map(lambda row: (row[1], row[2]))
rankingMetrics = RankingMetrics(perUserItemsRDD)
print(rankingMetrics.meanAveragePrecision)
print(rankingMetrics.precisionAt(k))
print(rankingMetrics.ndcgAt(k))
ref:
https://www.safaribooksonline.com/library/view/spark-the-definitive/9781491912201/ch19.html
Create a custom Transformer
from pyspark.ml import Transformer
class PredictionProcessor(Transformer):
def _transform(self, predictedDF):
nonNullDF = predictedDF.dropna(subset=['prediction', ])
predictionDF = nonNullDF.withColumn('prediction', nonNullDF['prediction'].cast('double'))
return predictionDF
ref:
http://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml
RankingMetrics
結果介於 0 ~ 1 之間,分數越大越好。
from pyspark.mllib.evaluation import RankingMetrics
predictionAndLabels = sc.parallelize([
([1, 2, 3, 4], [1, 2, 3, 4]),
([1, ], [1, 10]),
([6, 4, 2], [6, 2, 100, 8, 2, 55]),
])
metrics = RankingMetrics(predictionAndLabels)
metrics.ndcgAt(5)
ref:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems
Create a custom Evaluator
from pyspark import keyword_only
from pyspark.ml.evaluation import Evaluator
from pyspark.ml.param.shared import Param
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
import pyspark.sql.functions as F
class RankingEvaluator(Evaluator):
@keyword_only
def __init__(self, k=None):
super(RankingEvaluator, self).__init__()
self.k = Param(self, 'k', 'Top K')
self._setDefault(k=30)
kwargs = self.__init__._input_kwargs
self.setParams(**kwargs)
@keyword_only
def setParams(self, k=None):
kwargs = self.setParams._input_kwargs
return self._set(**kwargs)
def isLargerBetter(self):
return True
def setK(self, value):
self._paramMap[self.k] = value
return self
def getK(self):
return self.getOrDefault(self.k)
def _evaluate(self, outputDF):
k = self.getK()
windowSpec = Window.partitionBy('user').orderBy(col('prediction').desc())
perUserPredictedItemsDF = outputDF \
.select('user', 'item', 'prediction', F.rank().over(windowSpec).alias('rank')) \
.where('rank <= {0}'.format(k)) \
.groupBy('user') \
.agg(expr('collect_list(item) as items'))
windowSpec = Window.partitionBy('user').orderBy(col('starred_at').desc())
perUserActualItemsDF = outputDF \
.select('user', 'item', 'starred_at', F.rank().over(windowSpec).alias('rank')) \
.where('rank <= {0}'.format(k)) \
.groupBy('user') \
.agg(expr('collect_list(item) as items'))
perUserItemsRDD = perUserPredictedItemsDF.join(F.broadcast(perUserActualItemsDF), 'user', 'inner') \
.rdd \
.map(lambda row: (row[1], row[2]))
rankingMetrics = RankingMetrics(perUserItemsRDD)
metric = rankingMetrics.ndcgAt(k)
return metric
Show best parameters from a CrossValidatorModel
metric_params_pairs = list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))
metric_params_pairs.sort(key=lambda x: x[0], reverse=True)
best_metric_params = metric_params_pairs[0][1]
for pair in metric_params_pairs:
metric, params = pair
print('metric', metric)
for k, v in params.items():
print(k.name, v)
print('')
# metric 0.5273636632856705
# rank 50
# regParam 0.01
# maxIter 10
# alpha 1
ref:
https://stackoverflow.com/questions/31749593/how-to-extract-best-parameters-from-a-crossvalidatormodel
https://stackoverflow.com/questions/39529012/pyspark-get-all-parameters-of-models-created-with-paramgridbuilder
Show best parameters from a TrainValidationSplitModel
metric_params_pairs = list(zip(tvModel.validationMetrics, tvModel.getEstimatorParamMaps()))
metric_params_pairs.sort(key=lambda x: x[0], reverse=True)
for pair in metric_params_pairs:
metric, params = pair
print('metric', metric)
for k, v in params.items():
print(k.name, v)
print('')
# metric 0.5385481418189484
# rank 50
# regParam 0.1
# maxIter 20
# alpha 1