Scala is the first class citizen language for interacting with Apache Spark, but it's difficult to learn. This article is mostly about Spark ML - the new Spark Machine Learning library which was rewritten in DataFrame-based API.
Convert a String Categorical Feature into Numeric One
StringIndexer
converts labels (categorical values) into numbers (0.0, 1.0, 2.0 and so on) which ordered by label frequencies, the most frequnet label gets 0
. This method is able to handle unseen labels with optional strategies.
StringIndexer
's inputCol
accepts string, numeric and boolean types.
val df1 = spark.createDataFrame(Seq(
(1, "Python"),
(2, "C++"),
(3, "C++"),
(4, "JavaScript"),
(5, "Python"),
(6, "Python"),
(7, "Go")
BinaryClassificationEvaluator
)).toDF("repo_id", "repo_language")
val df2 = spark.createDataFrame(Seq(
(1, "Python"),
(2, "C++"),
(3, "C++"),
(4, "JavaScript"),
(5, "Python"),
(6, "Python"),
(7, "Go"),
(8, "JavaScript"),
(9, "Brainfuck"),
(10, "Brainfuck"),
(11, "Red")
)).toDF("repo_id", "repo_language")
import org.apache.spark.ml.feature.StringIndexer
val stringIndexer = new StringIndexer()
.setInputCol("repo_language")
.setOutputCol("repo_language_index")
.setHandleInvalid("keep")
val stringIndexerModel = stringIndexer.fit(df1)
stringIndexerModel.labels
// Array[String] = Array(Python, C++, JavaScript, Go)
val indexedDF = stringIndexerModel.transform(df2)
indexedDF.show()
// +-------+-------------+-------------------+
// |repo_id|repo_language|repo_language_index|
// +-------+-------------+-------------------+
// | 1| Python| 0.0|
// | 2| C++| 1.0|
// | 3| C++| 1.0|
// | 4| JavaScript| 3.0|
// | 5| Python| 0.0|
// | 6| Python| 0.0|
// | 7| Go| 2.0|
// | 8| JavaScript| 3.0|
// | 9| Brainfuck| 4.0| <- previously unseen
// | 10| Brainfuck| 4.0| <- previously unseen
// | 11| Red| 4.0| <- previously unseen
// +-------+-------------+-------------------+
ref:
https://spark.apache.org/docs/latest/ml-features.html#stringindexer
https://stackoverflow.com/questions/34681534/spark-ml-stringindexer-handling-unseen-labels
https://stackoverflow.com/questions/32277576/how-to-handle-categorical-features-with-spark-ml/32278617
Convert an Indexed Numeric Feature Back to the Original Categorical One
import org.apache.spark.ml.feature.IndexToString
val indexToString = new IndexToString()
.setInputCol("repo_language_index")
.setOutputCol("repo_language_ori")
val oriIndexedDF = indexToString.transform(indexedDF)
oriIndexedDF.show()
// +-------+-------------+-------------------+----------------------+
// |repo_id|repo_language|repo_language_index| repo_language_ori|
// +-------+-------------+-------------------+----------------------+
// | 1| Python| 0.0| Python|
// | 2| C++| 1.0| C++|
// | 3| C++| 1.0| C++|
// | 4| JavaScript| 2.0| JavaScript|
// | 5| Python| 0.0| Python|
// | 6| Python| 0.0| Python|
// | 7| Go| 3.0| Go|
// | 8| JavaScript| 2.0| JavaScript|
// | 9| Brainfuck| 4.0| __unknown| <- previously unseen
// | 10| Brainfuck| 4.0| __unknown| <- previously unseen
// | 11| Red| 4.0| __unknown| <- previously unseen
// +-------+-------------+-------------------+----------------------+
ref:
https://spark.apache.org/docs/latest/ml-features.html#indextostring
One-hot Encoding for Categorical Features
OneHotEncoder
's input column only accepts numeric types. If you have string columns, you need to use StringIndexer
to transform them into doubles, bessides, StringIndexer
is able to properly deal with unseen values. In my humble opinion, you should always apply StringIndexer
before OneHotEncoder
.
Be careful that OneHotEncoder
's vector length will be the maximun value in the column, you must apply OneHotEncoder
on the union dataset of both training set and test set. Since OneHotEncoder
does not accept empty string for name, you need to replace all empty strings with a placeholder, something like __empty
.
import org.apache.spark.ml.feature.OneHotEncoder
val knownDF = spark.createDataFrame(Seq(
(2, "b"),
(3, "c"),
(0, "x"),
(6, "c"),
(4, "a"),
(1, "a"),
(5, "a")
)).toDF("category_1", "category_2")
val unseenDF = spark.createDataFrame(Seq(
(123, "e"),
(6, "c"),
(2, "b"),
(456, "c"),
(1, "a")
)).toDF("category_1", "category_2")
val knownOneHotDF = new OneHotEncoder()
.setDropLast(true)
.setInputCol("category_1")
.setOutputCol("category_1_one_hot")
.transform(knownDF)
knownOneHotDF.show()
// +----------+----------+------------------+
// |category_1|category_2|category_1_one_hot|
// +----------+----------+------------------+
// | 2| b| (6,[2],[1.0])|
// | 3| c| (6,[3],[1.0])|
// | 0| x| (6,[0],[1.0])|
// | 6| c| (6,[],[])|
// | 4| a| (6,[4],[1.0])|
// | 1| a| (6,[1],[1.0])|
// | 5| a| (6,[5],[1.0])|
// +----------+----------+------------------+
val unseenOneHotDF = new OneHotEncoder()
.setDropLast(true)
.setInputCol("category_1")
.setOutputCol("category_1_one_hot")
.transform(unseenDF)
unseenOneHotDF.show()
// +----------+----------+------------------+
// |category_1|category_2|category_1_one_hot|
// +----------+----------+------------------+
// | 123| e| (456,[123],[1.0])|
// | 6| c| (456,[6],[1.0])|
// | 2| b| (456,[2],[1.0])|
// | 456| c| (456,[],[])|
// | 1| a| (456,[1],[1.0])|
// +----------+----------+------------------+
ref:
https://spark.apache.org/docs/latest/ml-features.html#onehotencoder
https://stackoverflow.com/questions/32277576/how-to-handle-categorical-features-with-spark-ml/40615508
https://stackoverflow.com/questions/33089781/spark-dataframe-handing-empty-string-in-onehotencoder
Create a Regular Expression Tokenizer
setGaps(true)
時的 pattern 是 match 分隔符;setGaps(false)
時的 pattern 則是 match 字。
import org.apache.spark.ml.feature.RegexTokenizer
import org.apache.spark.sql.functions._
val sentenceDF = spark.createDataFrame(Seq(
(1, "Hi, I heard about Spark"),
(2, "I wish Java could use case classes."),
(3, "Deep,Learning,models,are,state-of-the-art"),
(4, "fuck_yeah!!! No.")
)).toDF("id", "sentence")
val countTokensUDF = udf((words: Seq[String]) => words.length)
val regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("""[\w\-_]+""").setGaps(false)
// .setPattern("""\W""").setGaps(true)
// .setPattern("""[,. ]""").setGaps(true)
val tokenizedDF = regexTokenizer.transform(sentenceDF)
val df = tokenizedDF
.select("sentence", "words")
.withColumn("count", countTokensUDF($"words"))
// +-----------------------------------------+-----------------------------------------------+-----+
// |sentence |words |count|
// +-----------------------------------------+-----------------------------------------------+-----+
// |Hi, I heard about Spark |[hi, i, heard, about, spark] |5 |
// |I wish Java could use case classes. |[i, wish, java, could, use, case, classes] |7 |
// |Deep,Learning,models,are,state-of-the-art|[deep, learning, models, are, state-of-the-art]|5 |
// |fuck_yeah!!! No. |[fuck_yeah, no] |2 |
// +-----------------------------------------+-----------------------------------------------+-----+
ref:
https://spark.apache.org/docs/latest/ml-features.html#tokenizer
Handle Comma-seperated Categorical Column
You could use RegexTokenizer
, CountVectorizer
or HashingTF
.
import org.apache.spark.ml.feature.{RegexTokenizer, CountVectorizer}
val df = spark.createDataFrame(Seq(
(1, "Action,Sci-Fi"),
(2, "Sci-Fi,Romance,Horror"),
(3, "War,Horror")
)).toDF("movie_id", "genres")
val regexTokenizer = new RegexTokenizer()
.setInputCol("genres")
.setOutputCol("genres_words")
.setPattern("""[\w\-_]+""").setGaps(false)
val wordsDF = regexTokenizer.transform(df)
val countVectorizerModel = new CountVectorizer()
.setInputCol("genres_words")
.setOutputCol("genres_vector")
.setMinDF(1) // for whole corpus, delete any term that appears less then n times
.setMinTF(1) // for each document, delete any term that appears less then n times
.fit(wordsDF)
val countVectorDF = countModel.transform(wordsDF)
// HashingTF might suffer from potential hash collisions
// it's good to use a power of two
val hashingTF = new HashingTF()
.setInputCol("genres_words")
.setOutputCol("genres_htf_vector")
.setNumFeatures(4)
val htfVectorDF = hashingTF.transform(countVectorDF)
htfVectorDF.show(false)
// +--------+---------------------+-------------------------+-------------------------+-------------------+
// |movie_id|genres |genres_words |genres_count_vector |genres_htf_vector |
// +--------+---------------------+-------------------------+-------------------------+-------------------+
// |1 |Action,Sci-Fi |[action, sci-fi] |(5,[0,3],[1.0,1.0]) |(4,[0],[2.0]) |
// |2 |Sci-Fi,Romance,Horror|[sci-fi, romance, horror]|(5,[0,1,4],[1.0,1.0,1.0])|(4,[0,2],[2.0,1.0])|
// |3 |War,Horror |[war, horror] |(5,[1,2],[1.0,1.0]) |(4,[0,2],[1.0,1.0])|
// +--------+---------------------+-------------------------+-------------------------+-------------------+
countModel.vocabulary
// Array(sci-fi, horror, action, romance, war)
ref:
https://spark.apache.org/docs/latest/ml-features.html#countvectorizer
https://spark.apache.org/docs/latest/ml-features.html#tf-idf
Train a Word2Vec Model
The output vector of any Word2Vec model is dense!
import org.apache.spark.ml.feature.Word2Vec
val df = spark.createDataFrame(Seq(
(1, "Hi I heard about Apache Spark".toLowerCase().split(" ")),
(2, "I wish Java could use case classes".toLowerCase().split(" ")),
(3, "Logistic regression models are neat".toLowerCase().split(" ")),
(4, "Apache Spark with Scala is awesome".toLowerCase().split(" ")),
(5, Array("中文", "嘛ㄟ通", "but", "必須", "另外", "分詞"))
)).toDF("id", "words")
val word2Vec = new Word2Vec()
.setInputCol("words")
.setOutputCol("words_w2v")
.setMaxIter(10)
.setVectorSize(3)
.setWindowSize(5)
.setMinCount(1)
val word2VecModel = word2Vec.fit(df)
word2VecModel.transform(df)
// +---+------------------------------------------+----------------------------------------------------------+
// |id |words |words_w2v |
// +---+------------------------------------------+----------------------------------------------------------+
// |1 |[hi, i, heard, about, apache, spark] |[-0.02013699459393,-0.02995631482274,0.047685102870066956]|
// |2 |[i, wish, java, could, use, case, classes]|[-0.05012317272186,0.01141336891094,-0.03742781743806387] |
// |3 |[logistic, regression, models, are, neat] |[-0.04678827972413,0.032994424477,0.0010566591750830413] |
// |4 |[apache, spark, with, scala, is, awesome] |[0.0265524153169,0.02056275321716,0.013326843579610188] |
// |5 |[中文, 嘛ㄟ通, but, 必須, 另外, 分詞] |[0.0571783996973,-0.02301329133545,0.013507421438892681] |
// +---+------------------------------------------+----------------------------------------------------------+
val df2 = spark.createDataFrame(Seq(
(6, Array("not-in-vocabularies", "neither", "no")),
(7, Array("spark", "not-in-vocabularies")),
(8, Array("not-in-vocabularies", "spark")),
(9, Array("no", "not-in-vocabularies", "spark")),
(10, Array("中文", "spark"))
)).toDF("id", "words")
word2VecModel.transform(df2)
// the order of words doesn't mater
// +---+-------------------------------------+-----------------------------------------------------------------+
// |id |words |words_w2v |
// +---+-------------------------------------+-----------------------------------------------------------------+
// |6 |[not-in-vocabularies, neither, no] |[0.0,0.0,0.0] |
// |7 |[spark, hell_no, not-in-vocabularies]|[0.0027440187210838,-0.0529780387878418,0.05730373660723368] |
// |8 |[hell_no, not-in-vocabularies, spark]|[0.0027440187210838,-0.0529780387878418,0.05730373660723368] |
// |9 |[not-in-vocabularies, hell_no, spark]|[0.0027440187210838,-0.0529780387878418,0.05730373660723368] |
// |10 |[no, not-in-vocabularies, spark] |[0.0027440187210838,-0.0529780387878418,0.05730373660723368] |
// |11 |[中文, spark] |[-0.009499748703092337,-0.018227852880954742,0.13357853144407272]|
// +---+-------------------------------------+-----------------------------------------------------------------+
anotherWord2VecModel.findSynonyms("developer", 5)
// +-----------+------------------+
// | word| similarity|
// +-----------+------------------+
// | dev| 0.881394624710083|
// |development|0.7730562090873718|
// | oier|0.6866029500961304|
// | develover|0.6720684766769409|
// | webdev|0.6582568883895874|
// +-----------+------------------+
ref:
https://spark.apache.org/docs/latest/ml-features.html#word2vec
Calculate the Pearson Correlation between Features
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Matrix
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.Row
val featureNames = Array("stargazers_count", "forks_count", "subscribers_count")
val vectorAssembler = new VectorAssembler()
.setInputCols(featureNames)
.setOutputCol("features")
val df = vectorAssembler.transform(rawRepoInfoDS)
val correlationDF = Correlation.corr(df, "features")
val Row(coeff: Matrix) = correlationDF.head
println(featureNames.mkString(", "))
println(coeff.toString)
// stargazers_count, forks_count, subscribers_count
// 1.0 0.5336901230713282 0.7664204175159971
// 0.5336901230713282 1.0 0.5414244966152617
// 0.7664204175159971 0.5414244966152617 1.0
ref:
https://spark.apache.org/docs/latest/ml-statistics.html
DIMSUM
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
val repoWordRDD = repoVectorDF
.select($"repo_id", $"text_w2v")
.rdd
.flatMap((row: Row) => {
val repoId = row.getInt(0)
val vector = row.getAs[DenseVector](1)
vector.toArray.zipWithIndex.map({
case (element, index) => MatrixEntry(repoId, index, element)
})
})
val repoWordMatrix = new CoordinateMatrix(repoWordRDD)
val wordRepoMatrix = repoWordMatrix.transpose
val repoSimilarityRDD = wordRepoMatrix
.toRowMatrix
.columnSimilarities(0.1)
.entries
.flatMap({
case MatrixEntry(row: Long, col: Long, sim: Double) => {
if (sim >= 0.5) {
Array((row, col, sim))
}
else {
None
}
}
})
spark.createDataFrame(repoSimilarityRDD).toDF("item_1", "item_2", "similarity")
repoSimilarityDF.show(false)
ref:
https://stackoverflow.com/questions/42455725/columnsimilarities-back-to-spark-data-frame
https://forums.databricks.com/questions/248/when-should-i-use-rowmatrixcolumnsimilarities.html
Train a Locality Sensitive Hashing (LSH) Model: Bucketed Random Projection LSH
To specify the value of bucketLength
, if input vectors are normalized, 1-10 times of pow(numRecords, -1/inputDim) would be a reasonable value. For instance, Math.pow(334913.0, -1.0 / 200.0) = 0.9383726472256705
.
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors
val userDF = spark.createDataFrame(Seq(
(1, Vectors.sparse(6, Seq((0, -4.0), (1, 1.0), (2, 0.2)))),
(2, Vectors.sparse(6, Seq((0, 5.5), (1, -0.6), (2, 9.0)))),
(3, Vectors.sparse(6, Seq((1, 1.0), (2, 5.3), (4, 3.0)))),
(4, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0)))),
(5, Vectors.sparse(6, Seq((2, 1.0), (5, -0.2)))),
(6, Vectors.sparse(6, Seq((0, 0.7)))),
(7, Vectors.sparse(6, Seq((1, 0.3), (2, 1.0))))
)).toDF("user_id", "features")
val repoDF = spark.createDataFrame(Seq(
(11, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0), (3, 1.0), (4, 1.0), (5, 1.0)))),
(12, Vectors.sparse(6, Seq((0, 9.0), (1, -2.0), (2, -21.0), (3, 9.0), (4, 1.0), (5, 9.0)))),
(13, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, -3.0), (3, 3.0), (4, 7.0), (5, 9.0)))),
(14, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, -3.0)))),
(15, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0))))
)).toDF("repo_id", "features")
val lsh = new BucketedRandomProjectionLSH()
.setBucketLength(0.6812920690579612)
.setNumHashTables(4)
.setInputCol("features")
.setOutputCol("hashes")
val lshModel = lsh.fit(repoDF)
val hashedUserDF = lshModel.transform(userDF)
val hashedRepoDF = lshModel.transform(repoDF)
hashedRepoDF.show(false)
// +-------+----------------------------------------------+--------------------------------+
// |repo_id|features |hashes |
// +-------+----------------------------------------------+--------------------------------+
// |11 |(6,[0,1,2,3,4,5],[1.0,1.0,1.0,1.0,1.0,1.0]) |[[1.0], [-2.0], [-1.0], [-1.0]] |
// |12 |(6,[0,1,2,3,4,5],[9.0,-2.0,-21.0,9.0,1.0,9.0])|[[21.0], [-28.0], [18.0], [0.0]]|
// |13 |(6,[0,1,2,3,4,5],[1.0,1.0,-3.0,3.0,7.0,9.0]) |[[4.0], [-10.0], [6.0], [-3.0]] |
// |14 |(6,[0,1,2],[1.0,1.0,-3.0]) |[[2.0], [-3.0], [2.0], [1.0]] |
// |15 |(6,[1,2],[1.0,1.0]) |[[-1.0], [0.0], [-2.0], [0.0]] |
// +-------+----------------------------------------------+--------------------------------+
val similarDF = lshModel
.approxSimilarityJoin(hashedUserDF, hashedRepoDF, 10.0, "distance")
.select($"datasetA.user_id".alias("user_id"), $"datasetB.repo_id".alias("repo_id"), $"distance")
.orderBy($"user_id", $"distance".asc)
similarDF.show(false)
// +-------+-------+------------------+
// |user_id|repo_id|distance |
// +-------+-------+------------------+
// |1 |15 |4.079215610874228 |
// |3 |15 |5.243090691567332 |
// |4 |15 |1.0 |
// |4 |11 |1.7320508075688772|
// |5 |15 |1.019803902718557 |
// |5 |11 |2.33238075793812 |
// |6 |15 |1.57797338380595 |
// |7 |15 |0.7 |
// |7 |11 |2.118962010041709 |
// +-------+-------+------------------+
val userVector = Vectors.sparse(6, Seq((0, 1.5), (1, 0.8), (2, 2.0)))
val singleSimilarDF = lshModel
.approxNearestNeighbors(hashedRepoDF, userVector, 5, "distance")
.select($"repo_id", $"features", $"distance")
singleSimilarDF.show(false)
// +-------+----------------------------------------------+------------------+
// |repo_id|features |distance |
// +-------+----------------------------------------------+------------------+
// |15 |(6,[1,2],[1.0,1.0]) |1.8138357147217055|
// |12 |(6,[0,1,2,3,4,5],[9.0,-2.0,-21.0,9.0,1.0,9.0])|27.49709075520536 |
// +-------+----------------------------------------------+------------------+
The problem of approxSimilarityJoin()
is that you can't control the number of generated items, the disadvantage of approxNearestNeighbors()
is that you have to manually iterate all users to find similar items. Moreover, both methods can easily suffer from the infamous java.lang.OutOfMemoryError
.
ref:
https://spark.apache.org/docs/latest/ml-features.html#locality-sensitive-hashing
Train a Locality Sensitive Hashing (LSH) Model: MinHash LSH
MinHash LSH treats input as a binary vector, that is, all non-zero values (include negative values) are just 1
. Basically, the Word2Vec vector won't be an appropriate input to MinHash LSH.
import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors
val userDF = spark.createDataFrame(Seq(
(1, Vectors.sparse(6, Seq((0, -4.0), (1, 1.0), (2, 0.2)))),
(2, Vectors.sparse(6, Seq((0, 5.5), (1, -0.6), (2, 9.0)))),
(3, Vectors.sparse(6, Seq((1, 1.0), (2, 5.3), (4, 3.0)))),
(4, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0)))),
(5, Vectors.sparse(6, Seq((2, 1.0), (5, -0.2)))),
(6, Vectors.sparse(6, Seq((2, 0.7)))),
(7, Vectors.sparse(6, Seq((3, 0.3), (5, 1.0))))
)).toDF("user_id", "features")
val repoDF = spark.createDataFrame(Seq(
(11, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
(12, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
(13, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("repo_id", "features")
val lsh = new MinHashLSH()
.setNumHashTables(4)
.setInputCol("features")
.setOutputCol("hashes")
val lshModel = lsh.fit(userDF)
val hashedUserDF = lshModel.transform(userDF)
val hashedRepoDF = lshModel.transform(repoDF)
hashedUserDF.show(false)
// user 1 and 2 have the same hashed vector
// user 3 and 4 have the same hashed vector
// +-------+--------------------------+-----------------------------------------------------------------------+
// |user_id|features |hashes |
// +-------+--------------------------+-----------------------------------------------------------------------+
// |1 |(6,[0,1,2],[-4.0,1.0,0.2])|[[-2.031299587E9], [-1.974869772E9], [-1.974047307E9], [4.95314097E8]] |
// |2 |(6,[0,1,2],[5.5,-0.6,9.0])|[[-2.031299587E9], [-1.974869772E9], [-1.974047307E9], [4.95314097E8]] |
// |3 |(6,[1,2,4],[1.0,5.3,3.0]) |[[-2.031299587E9], [-1.974869772E9], [-1.230128022E9], [8.7126731E8]] |
// |4 |(6,[1,2,4],[1.0,1.0,1.0]) |[[-2.031299587E9], [-1.974869772E9], [-1.230128022E9], [8.7126731E8]] |
// |5 |(6,[2,5],[1.0,-0.2]) |[[-2.031299587E9], [-1.758749518E9], [-4.86208737E8], [-1.919887134E9]]|
// |6 |(6,[2],[0.7]) |[[-2.031299587E9], [-1.758749518E9], [-4.86208737E8], [1.247220523E9]] |
// |7 |(6,[3,5],[0.3,1.0]) |[[-1.278435698E9], [-1.542629264E9], [2.57710548E8], [-1.919887134E9]] |
// +-------+--------------------------+-----------------------------------------------------------------------+
val userSimilarRepoDF = lshModel
.approxSimilarityJoin(hashedUserDF, hashedRepoDF, 0.6, "distance")
.select($"datasetA.user_id".alias("user_id"), $"datasetB.repo_id".alias("repo_id"), $"distance")
.orderBy($"user_id", $"distance".asc)
userSimilarRepoDF.show(false)
// +-------+-------+-------------------+
// |user_id|repo_id|distance |
// +-------+-------+-------------------+
// |1 |13 |0.5 |
// |2 |13 |0.5 |
// |3 |13 |0.0 |
// |4 |13 |0.0 |
// |5 |12 |0.33333333333333337|
// |7 |12 |0.33333333333333337|
// |7 |11 |0.33333333333333337|
// +-------+-------+-------------------+
Train a Logistic Regression Model
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.Vectors
val training = spark.createDataFrame(Seq(
(1.0, Vectors.dense(1.0, 2.5, 0.0, 0.0)),
(1.0, Vectors.dense(0.1, 9.0, 0.0, 0.0)),
(1.0, Vectors.dense(0.0, 0.0, 1.0, 0.0)),
(0.0, Vectors.dense(0.0, 0.0, 2.0, 9.0)),
(0.0, Vectors.dense(1.0, 0.0, 0.0, 5.0))
)).toDF("label", "features")
val lr = new LogisticRegression()
.setMaxIter(100)
.setRegParam(0.0)
.setElasticNetParam(0.0)
.setFamily("binomial")
.setFeaturesCol("features")
.setLabelCol("label")
lr.explainParams()
val lrModel = lr.fit(training)
println(s"Coefficients: ${lrModel.coefficients}")
// [2.0149015925419,2.694173163503675,9.547978766053463,-5.592221425156231]
println(s"Intercept: ${lrModel.intercept}")
// 8.552229795281482
val result = lrModel.transform(test)
ref:
https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression
https://spark.apache.org/docs/latest/mllib-linear-methods.html#logistic-regression
import org.apache.spark.ml.classification.BinaryLogisticRegressionSummary
val binarySummary = lrModel.summary.asInstanceOf[BinaryLogisticRegressionSummary]
println(s"Area Under ROC: ${binarySummary.areaUnderROC}")
Evaluate a Binary Classification Model
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.linalg.Vectors
val df = spark.createDataFrame(Seq(
(Vectors.dense(0.0, 2.5), 1.0), // correct
(Vectors.dense(1.0, 4.1), 1.0), // correct
(Vectors.dense(9.2, 1.1), 0.0), // correct
(Vectors.dense(1.0, 0.1), 0.0), // correct
(Vectors.dense(5.0, 0.5), 1.0) // incorrect
)).toDF("rawPrediction", "starring")
val evaluator = new BinaryClassificationEvaluator()
.setMetricName("areaUnderROC")
.setRawPredictionCol("rawPrediction")
.setLabelCol("starring")
val metric = evaluator.evaluate(df)
// 0.8333333333333333
Train an ALS Model
import org.apache.spark.ml.recommendation.ALS
val df = spark.createDataFrame(Seq(
(1, 1, 12),
(1, 2, 90),
(1, 4, 4),
(2, 4, 1),
(3, 5, 8)
)).toDF("user", "item", "rating")
val als = new ALS()
.setImplicitPrefs(true)
.setRank(5)
.setRegParam(0.5)
.setAlpha(40)
.setMaxIter(10)
.setSeed(42)
.setColdStartStrategy("drop")
val alsModel = als.fit(df)
val predictionDF = alsModel.transform(df)
// +----+----+------+----------+
// |user|item|rating|prediction|
// +----+----+------+----------+
// | 1| 1| 12| 0.9988487|
// | 3| 5| 8| 0.9984464|
// | 1| 4| 4|0.99887615|
// | 2| 4| 1| 0.9921428|
// | 1| 2| 90| 0.9997897|
// +----+----+------+----------+
predictionDF.printSchema()
// root
// |-- user: integer (nullable = false)
// |-- item: integer (nullable = false)
// |-- rating: integer (nullable = false)
// |-- prediction: float (nullable = false)
val userRecommendationsDF = alsModel.recommendForAllUsers(15)
// +----+-----------------------------------------------------------------+
// |user|recommendations |
// +----+-----------------------------------------------------------------+
// |1 |[[2,0.9997897], [4,0.9988761], [1,0.9988487], [5,0.0]] |
// |3 |[[5,0.9984464], [1,2.9802322E-8], [2,0.0], [4,0.0]] |
// |2 |[[4,0.9921428], [2,0.10759391], [1,0.10749264], [5,1.4901161E-8]]|
// +----+-----------------------------------------------------------------+
userRecommendationsDF.printSchema()
// root
// |-- user: integer (nullable = false)
// |-- recommendations: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- item: integer (nullable = true)
// | | |-- rating: float (nullable = true)
ref:
https://spark.apache.org/docs/latest/ml-collaborative-filtering.html
Save and Load an ALS Model
import org.apache.hadoop.mapred.InvalidInputException
import org.apache.spark.ml.recommendation.{ALS, ALSModel}
val alsModelSavePath = "./spark-data/20170902/alsModel.parquet"
val alsModel: ALSModel = try {
ALSModel.load(alsModelSavePath)
} catch {
case e: InvalidInputException => {
if (e.getMessage().contains("Input path does not exist")) {
val als = new ALS()
.setImplicitPrefs(true)
.setRank(100)
.setRegParam(0.5)
.setAlpha(40)
.setMaxIter(22)
.setSeed(42)
.setColdStartStrategy("drop")
.setUserCol("user_id")
.setItemCol("repo_id")
.setRatingCol("starring")
val alsModel = als.fit(rawRepoStarringDS)
alsModel.save(alsModelSavePath)
alsModel
} else {
throw e
}
}
}
Create a Custom Transformer
package ws.vinta.albedo.transformers
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.{DoubleParam, Param, ParamMap}
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import scala.collection.mutable
class NegativeBalancer(override val uid: String, val bcPopularItems: Broadcast[mutable.LinkedHashSet[Int]])
extends Transformer with DefaultParamsWritable {
def this(bcPopularItems: Broadcast[mutable.LinkedHashSet[Int]]) = {
this(Identifiable.randomUID("negativeBalancer"), bcPopularItems)
}
val userCol = new Param[String](this, "userCol", "User 所在的欄位名稱")
def getUserCol: String = $(userCol)
def setUserCol(value: String): this.type = set(userCol, value)
setDefault(userCol -> "user")
val itemCol = new Param[String](this, "itemCol", "Item 所在的欄位名稱")
def getItemCol: String = $(itemCol)
def setItemCol(value: String): this.type = set(itemCol, value)
setDefault(itemCol -> "item")
val labelCol = new Param[String](this, "labelCol", "Label 所在的欄位名稱")
def getLabelCol: String = $(labelCol)
def setLabelCol(value: String): this.type = set(labelCol, value)
setDefault(labelCol -> "label")
val negativeValue = new DoubleParam(this, "negativeValue", "負樣本的值")
def getNegativeValue: Double = $(negativeValue)
def setNegativeValue(value: Double): this.type = set(negativeValue, value)
setDefault(negativeValue -> 0.0)
val negativePositiveRatio = new DoubleParam(this, "negativePositiveRatio", "負樣本與正樣本的比例")
def getNegativePositiveRatio: Double = $(negativePositiveRatio)
def setNegativePositiveRatio(value: Double): this.type = set(negativePositiveRatio, value)
setDefault(negativePositiveRatio -> 1.0)
override def transformSchema(schema: StructType): StructType = {
Map($(userCol) -> IntegerType, $(itemCol) -> IntegerType, $(labelCol) -> DoubleType)
.foreach{
case(columnName: String, expectedDataType: DataType) => {
val actualDataType = schema(columnName).dataType
require(actualDataType.equals(IntegerType), s"Column $columnName must be of type $expectedDataType but was actually $actualDataType.")
}
}
schema
}
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema)
val popularItems: mutable.LinkedHashSet[Int] = this.bcPopularItems.value
val emptyItemSet = new mutable.HashSet[Int]
val addToItemSet = (itemSet: mutable.HashSet[Int], item: Int) => itemSet += item
val mergeItemSets = (set1: mutable.HashSet[Int], set2: mutable.HashSet[Int]) => set1 ++= set2
val getUserNegativeItems = (userItemsPair: (Int, mutable.HashSet[Int])) => {
val (user, positiveItems) = userItemsPair
val negativeItems = popularItems.diff(positiveItems)
val requiredNegativeItemsCount = (positiveItems.size * this.getNegativePositiveRatio).toInt
(user, negativeItems.slice(0, requiredNegativeItemsCount))
}
val expandNegativeItems = (userItemsPair: (Int, mutable.LinkedHashSet[Int])) => {
val (user, negativeItems) = userItemsPair
negativeItems.map({(user, _, $(negativeValue))})
}
import dataset.sparkSession.implicits._
// TODO: 目前是假設傳進來的 dataset 都是 positive samples,之後可能得處理含有 negative samples 的情況
val negativeDF = dataset
.select($(userCol), $(itemCol))
.rdd
.map({
case Row(user: Int, item: Int) => (user, item)
})
.aggregateByKey(emptyItemSet)(addToItemSet, mergeItemSets)
.map(getUserNegativeItems)
.flatMap(expandNegativeItems)
.toDF($(userCol), $(itemCol), $(labelCol))
dataset.select($(userCol), $(itemCol), $(labelCol)).union(negativeDF)
}
override def copy(extra: ParamMap): this.type = {
defaultCopy(extra)
}
}
ref:
https://www.safaribooksonline.com/library/view/high-performance-spark/9781491943199/ch09.html#extending_spark_ml
https://stackoverflow.com/questions/40615713/how-to-write-a-custom-transformer-in-mllib
https://issues.apache.org/jira/browse/SPARK-17048
Create a Custom Evaluator
package ws.vinta.albedo.evaluators
import org.apache.spark.ml.evaluation.Evaluator
import org.apache.spark.ml.param.{Param, ParamMap}
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable}
import org.apache.spark.mllib.evaluation.RankingMetrics
import org.apache.spark.sql.{DataFrame, Dataset, Row}
class RankingEvaluator(override val uid: String, val userActualItemsDF: DataFrame)
extends Evaluator with DefaultParamsWritable {
def this(userActualItemsDF: DataFrame) = {
this(Identifiable.randomUID("rankingEvaluator"), userActualItemsDF)
}
val metricName = new Param[String](this, "metricName", "評估方式")
def getMetricName: String = $(metricName)
def setMetricName(value: String): this.type = set(metricName, value)
setDefault(metricName -> "ndcg@k")
val k = new Param[Int](this, "k", "只評估前 k 個 items 的排序結果")
def getK: Int = $(k)
def setK(value: Int): this.type = set(k, value)
setDefault(k -> 15)
override def isLargerBetter: Boolean = $(metricName) match {
case "map" => true
case "ndcg@k" => true
case "precision@k" => true
}
override def evaluate(dataset: Dataset[_]): Double = {
import dataset.sparkSession.implicits._
val userPredictedItemsDF = dataset.select($"user_id", $"recommendations.repo_id".alias("items"))
val bothItemsRDD = userPredictedItemsDF.join(userActualItemsDF, Seq("user_id", "user_id"))
.select(userPredictedItemsDF.col("items"), userActualItemsDF.col("items"))
.rdd
.map((row: Row) => {
// Row(userPredictedItems, userActualItems)
(row(0).asInstanceOf[Seq[Int]].toArray, row(1).asInstanceOf[Seq[Int]].toArray)
})
val rankingMetrics = new RankingMetrics(bothItemsRDD)
val metric = $(metricName) match {
case "map" => rankingMetrics.meanAveragePrecision
case "ndcg@k" => rankingMetrics.ndcgAt($(k))
case "precision@k" => rankingMetrics.precisionAt($(k))
}
metric
}
override def copy(extra: ParamMap): RankingEvaluator = {
defaultCopy(extra)
}
}
ref:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems
https://www.safaribooksonline.com/library/view/spark-the-definitive/9781491912201/ch19.html#s6c5---recommendation
Apply Transformer on Multiple Columns
import org.apache.spark.ml.feature._
val userCategoricalColumnNames = Array("account_type", "clean_company", "clean_email", "clean_location")
val userCategoricalTransformers = userCategoricalColumnNames.flatMap((columnName: String) => {
val stringIndexer = new StringIndexer()
.setInputCol(columnName)
.setOutputCol(s"${columnName}_index")
.setHandleInvalid("keep")
val oneHotEncoder = new OneHotEncoder()
.setInputCol(s"${columnName}_index")
.setOutputCol(s"${columnName}_ohe")
.setDropLast(true)
Array(stringIndexer, oneHotEncoder)
})
userCategoricalTransformers.foreach(println)
// strIdx_4029f57e379a
// oneHot_f0decb92a05c
// strIdx_fb855ad6caaa
// oneHot_f1be19344002
// strIdx_7fa62a683293
// oneHot_097ae442d8fc
// strIdx_0ff7ffa022a1
// oneHot_4a9f72a7f5d8
ref:
https://stackoverflow.com/questions/34167105/using-spark-mls-onehotencoder-on-multiple-columns
Cross-validate a Pipeline Model
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("feature1", "feature2", "feature3"))
.setOutputCol("features")
val lr = new LogisticRegression()
.setFeaturesCol("features")
.setLabelCol("starring")
val pipeline = new Pipeline()
.setStages(Array(vectorAssembler, lr))
val paramGrid = new ParamGridBuilder()
.addGrid(lr.maxIter, Array(20, 100))
.addGrid(lr.regParam, Array(0.0, 0.5, 1.0, 2.0))
.addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
.build()
val evaluator = new BinaryClassificationEvaluator()
.setMetricName("areaUnderROC")
.setRawPredictionCol("rawPrediction")
.setLabelCol("starring")
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEstimatorParamMaps(paramGrid)
.setEvaluator(evaluator)
.setNumFolds(3)
val cvModel = cv.fit(trainingDF)
ref:
https://spark.apache.org/docs/latest/ml-tuning.html#cross-validation
Extract Best Parameters from a Cross-validation Model
import org.apache.spark.ml.PipelineModel
import org.apache.spark.ml.classification.LogisticRegressionModel
val bestPipelineModel = cvModel.bestModel.asInstanceOf[PipelineModel]
val lrModel = bestPipelineModel.stages(0).asInstanceOf[LogisticRegressionModel]
lrModel.extractParamMap()
// or
lrModel.explainParams()
Show All Parameters of a Cross-validation Model
import org.apache.spark.ml.param.ParamMap
cvModel.getEstimatorParamMaps
.zip(cvModel.avgMetrics)
.sortWith(_._2 > _._2)
.foreach((pair: (ParamMap, Double)) => {
println(s"${pair._2}: ${pair._1}")
})
// 0.8999999999999999: {
// hashingTF_ac8be8d5806b-numFeatures: 1000,
// logreg_9f79de6e51ec-regParam: 0.1
// }
// 0.8875: {
// hashingTF_ac8be8d5806b-numFeatures: 100,
// logreg_9f79de6e51ec-regParam: 0.1
// }
// 0.875: {
// hashingTF_ac8be8d5806b-numFeatures: 100,
// logreg_9f79de6e51ec-regParam: 0.01
// }
ref:
https://stackoverflow.com/questions/31749593/how-to-extract-best-parameters-from-a-crossvalidatormodel
https://alvinalexander.com/scala/how-sort-scala-sequences-seq-list-array-buffer-vector-ordering-ordered