Spark ML cookbook (Scala)

Scala is the first class citizen language for interacting with Apache Spark, but it's difficult to learn. This article is mostly about Spark ML - the new Spark Machine Learning library which was rewritten in DataFrame-based API.

Convert a String Categorical Feature into Numeric One

StringIndexer converts labels (categorical values) into numbers (0.0, 1.0, 2.0 and so on) which ordered by label frequencies, the most frequnet label gets 0. This method is able to handle unseen labels with optional strategies.

StringIndexer's inputCol accepts string, numeric and boolean types.

val df1 = spark.createDataFrame(Seq(
    (1, "Python"),
    (2, "C++"),
    (3, "C++"),
    (4, "JavaScript"),
    (5, "Python"),
    (6, "Python"),
    (7, "Go")
BinaryClassificationEvaluator
)).toDF("repo_id", "repo_language")

val df2 = spark.createDataFrame(Seq(
    (1, "Python"),
    (2, "C++"),
    (3, "C++"),
    (4, "JavaScript"),
    (5, "Python"),
    (6, "Python"),
    (7, "Go"),
    (8, "JavaScript"),
    (9, "Brainfuck"),
    (10, "Brainfuck"),
    (11, "Red")
)).toDF("repo_id", "repo_language")

import org.apache.spark.ml.feature.StringIndexer

val stringIndexer = new StringIndexer()
  .setInputCol("repo_language")
  .setOutputCol("repo_language_index")
  .setHandleInvalid("keep")
val stringIndexerModel = stringIndexer.fit(df1)

stringIndexerModel.labels
// Array[String] = Array(Python, C++, JavaScript, Go)

val indexedDF = stringIndexerModel.transform(df2)
indexedDF.show()
// +-------+-------------+-------------------+
// |repo_id|repo_language|repo_language_index|
// +-------+-------------+-------------------+
// |      1|       Python|                0.0|
// |      2|          C++|                1.0|
// |      3|          C++|                1.0|
// |      4|   JavaScript|                3.0|
// |      5|       Python|                0.0|
// |      6|       Python|                0.0|
// |      7|           Go|                2.0|
// |      8|   JavaScript|                3.0|
// |      9|    Brainfuck|                4.0| <- previously unseen
// |     10|    Brainfuck|                4.0| <- previously unseen
// |     11|          Red|                4.0| <- previously unseen
// +-------+-------------+-------------------+

ref:
https://spark.apache.org/docs/latest/ml-features.html#stringindexer
https://stackoverflow.com/questions/34681534/spark-ml-stringindexer-handling-unseen-labels
https://stackoverflow.com/questions/32277576/how-to-handle-categorical-features-with-spark-ml/32278617

Convert an Indexed Numeric Feature Back to the Original Categorical One

import org.apache.spark.ml.feature.IndexToString

val indexToString = new IndexToString()
  .setInputCol("repo_language_index")
  .setOutputCol("repo_language_ori")

val oriIndexedDF = indexToString.transform(indexedDF)
oriIndexedDF.show()
// +-------+-------------+-------------------+----------------------+
// |repo_id|repo_language|repo_language_index|     repo_language_ori|
// +-------+-------------+-------------------+----------------------+
// |      1|       Python|                0.0|                Python|
// |      2|          C++|                1.0|                   C++|
// |      3|          C++|                1.0|                   C++|
// |      4|   JavaScript|                2.0|            JavaScript|
// |      5|       Python|                0.0|                Python|
// |      6|       Python|                0.0|                Python|
// |      7|           Go|                3.0|                    Go|
// |      8|   JavaScript|                2.0|            JavaScript|
// |      9|    Brainfuck|                4.0|             __unknown| <- previously unseen
// |     10|    Brainfuck|                4.0|             __unknown| <- previously unseen
// |     11|          Red|                4.0|             __unknown| <- previously unseen
// +-------+-------------+-------------------+----------------------+

ref:
https://spark.apache.org/docs/latest/ml-features.html#indextostring

One-hot Encoding for Categorical Features

OneHotEncoder's input column only accepts numeric types. If you have string columns, you need to use StringIndexer to transform them into doubles, bessides, StringIndexer is able to properly deal with unseen values. In my humble opinion, you should always apply StringIndexer before OneHotEncoder.

Be careful that OneHotEncoder's vector length will be the maximun value in the column, you must apply OneHotEncoder on the union dataset of both training set and test set. Since OneHotEncoder does not accept empty string for name, you need to replace all empty strings with a placeholder, something like __empty.

import org.apache.spark.ml.feature.OneHotEncoder

val knownDF = spark.createDataFrame(Seq(
  (2, "b"),
  (3, "c"),
  (0, "x"),
  (6, "c"),
  (4, "a"),
  (1, "a"),
  (5, "a")
)).toDF("category_1", "category_2")

val unseenDF = spark.createDataFrame(Seq(
  (123, "e"),
  (6, "c"),
  (2, "b"),
  (456, "c"),
  (1, "a")
)).toDF("category_1", "category_2")

val knownOneHotDF = new OneHotEncoder()
  .setDropLast(true)
  .setInputCol("category_1")
  .setOutputCol("category_1_one_hot")
  .transform(knownDF)
knownOneHotDF.show()
// +----------+----------+------------------+
// |category_1|category_2|category_1_one_hot|
// +----------+----------+------------------+
// |         2|         b|     (6,[2],[1.0])|
// |         3|         c|     (6,[3],[1.0])|
// |         0|         x|     (6,[0],[1.0])|
// |         6|         c|         (6,[],[])|
// |         4|         a|     (6,[4],[1.0])|
// |         1|         a|     (6,[1],[1.0])|
// |         5|         a|     (6,[5],[1.0])|
// +----------+----------+------------------+

val unseenOneHotDF = new OneHotEncoder()
  .setDropLast(true)
  .setInputCol("category_1")
  .setOutputCol("category_1_one_hot")
  .transform(unseenDF)
unseenOneHotDF.show()
// +----------+----------+------------------+
// |category_1|category_2|category_1_one_hot|
// +----------+----------+------------------+
// |       123|         e| (456,[123],[1.0])|
// |         6|         c|   (456,[6],[1.0])|
// |         2|         b|   (456,[2],[1.0])|
// |       456|         c|       (456,[],[])|
// |         1|         a|   (456,[1],[1.0])|
// +----------+----------+------------------+

ref:
https://spark.apache.org/docs/latest/ml-features.html#onehotencoder
https://stackoverflow.com/questions/32277576/how-to-handle-categorical-features-with-spark-ml/40615508
https://stackoverflow.com/questions/33089781/spark-dataframe-handing-empty-string-in-onehotencoder

Create a Regular Expression Tokenizer

setGaps(true) 時的 pattern 是 match 分隔符;setGaps(false) 時的 pattern 則是 match 字。

import org.apache.spark.ml.feature.RegexTokenizer
import org.apache.spark.sql.functions._

val sentenceDF = spark.createDataFrame(Seq(
  (1, "Hi, I heard about Spark"),
  (2, "I wish Java could use case classes."),
  (3, "Deep,Learning,models,are,state-of-the-art"),
  (4, "fuck_yeah!!! No.")
)).toDF("id", "sentence")

val countTokensUDF = udf((words: Seq[String]) => words.length)

val regexTokenizer = new RegexTokenizer()
  .setInputCol("sentence")
  .setOutputCol("words")
  .setPattern("""[\w\-_]+""").setGaps(false)
  // .setPattern("""\W""").setGaps(true)
  // .setPattern("""[,. ]""").setGaps(true)
val tokenizedDF = regexTokenizer.transform(sentenceDF)

val df = tokenizedDF
  .select("sentence", "words")
  .withColumn("count", countTokensUDF($"words"))
// +-----------------------------------------+-----------------------------------------------+-----+
// |sentence                                 |words                                          |count|
// +-----------------------------------------+-----------------------------------------------+-----+
// |Hi, I heard about Spark                  |[hi, i, heard, about, spark]                   |5    |
// |I wish Java could use case classes.      |[i, wish, java, could, use, case, classes]     |7    |
// |Deep,Learning,models,are,state-of-the-art|[deep, learning, models, are, state-of-the-art]|5    |
// |fuck_yeah!!! No.                         |[fuck_yeah, no]                                |2    |
// +-----------------------------------------+-----------------------------------------------+-----+

ref:
https://spark.apache.org/docs/latest/ml-features.html#tokenizer

Handle Comma-seperated Categorical Column

You could use RegexTokenizer, CountVectorizer or HashingTF.

import org.apache.spark.ml.feature.{RegexTokenizer, CountVectorizer}

val df = spark.createDataFrame(Seq(
  (1, "Action,Sci-Fi"),
  (2, "Sci-Fi,Romance,Horror"),
  (3, "War,Horror")
)).toDF("movie_id", "genres")

val regexTokenizer = new RegexTokenizer()
  .setInputCol("genres")
  .setOutputCol("genres_words")
  .setPattern("""[\w\-_]+""").setGaps(false)
val wordsDF = regexTokenizer.transform(df)

val countVectorizerModel = new CountVectorizer()
  .setInputCol("genres_words")
  .setOutputCol("genres_vector")
  .setMinDF(1) // for whole corpus, delete any term that appears less then n times
  .setMinTF(1) // for each document, delete any term that appears less then n times
  .fit(wordsDF)
val countVectorDF = countModel.transform(wordsDF)

// HashingTF might suffer from potential hash collisions
// it's good to use a power of two
val hashingTF = new HashingTF()
  .setInputCol("genres_words")
  .setOutputCol("genres_htf_vector")
  .setNumFeatures(4)
val htfVectorDF = hashingTF.transform(countVectorDF)

htfVectorDF.show(false)
// +--------+---------------------+-------------------------+-------------------------+-------------------+
// |movie_id|genres               |genres_words             |genres_count_vector      |genres_htf_vector  |
// +--------+---------------------+-------------------------+-------------------------+-------------------+
// |1       |Action,Sci-Fi        |[action, sci-fi]         |(5,[0,3],[1.0,1.0])      |(4,[0],[2.0])      |
// |2       |Sci-Fi,Romance,Horror|[sci-fi, romance, horror]|(5,[0,1,4],[1.0,1.0,1.0])|(4,[0,2],[2.0,1.0])|
// |3       |War,Horror           |[war, horror]            |(5,[1,2],[1.0,1.0])      |(4,[0,2],[1.0,1.0])|
// +--------+---------------------+-------------------------+-------------------------+-------------------+

countModel.vocabulary
// Array(sci-fi, horror, action, romance, war)

ref:
https://spark.apache.org/docs/latest/ml-features.html#countvectorizer
https://spark.apache.org/docs/latest/ml-features.html#tf-idf

Train a Word2Vec Model

The output vector of any Word2Vec model is dense!

import org.apache.spark.ml.feature.Word2Vec

val df = spark.createDataFrame(Seq(
  (1, "Hi I heard about Apache Spark".toLowerCase().split(" ")),
  (2, "I wish Java could use case classes".toLowerCase().split(" ")),
  (3, "Logistic regression models are neat".toLowerCase().split(" ")),
  (4, "Apache Spark with Scala is awesome".toLowerCase().split(" ")),
  (5, Array("中文", "嘛ㄟ通", "but", "必須", "另外", "分詞"))
)).toDF("id", "words")

val word2Vec = new Word2Vec()
  .setInputCol("words")
  .setOutputCol("words_w2v")
  .setMaxIter(10)
  .setVectorSize(3)
  .setWindowSize(5)
  .setMinCount(1)
val word2VecModel = word2Vec.fit(df)

word2VecModel.transform(df)
// +---+------------------------------------------+----------------------------------------------------------+
// |id |words                                     |words_w2v                                                 |
// +---+------------------------------------------+----------------------------------------------------------+
// |1  |[hi, i, heard, about, apache, spark]      |[-0.02013699459393,-0.02995631482274,0.047685102870066956]|
// |2  |[i, wish, java, could, use, case, classes]|[-0.05012317272186,0.01141336891094,-0.03742781743806387] |
// |3  |[logistic, regression, models, are, neat] |[-0.04678827972413,0.032994424477,0.0010566591750830413]  |
// |4  |[apache, spark, with, scala, is, awesome] |[0.0265524153169,0.02056275321716,0.013326843579610188]   |
// |5  |[中文, 嘛ㄟ通, but, 必須, 另外, 分詞]         |[0.0571783996973,-0.02301329133545,0.013507421438892681]  |
// +---+------------------------------------------+----------------------------------------------------------+

val df2 = spark.createDataFrame(Seq(
  (6, Array("not-in-vocabularies", "neither", "no")),
  (7, Array("spark", "not-in-vocabularies")),
  (8, Array("not-in-vocabularies", "spark")),
  (9, Array("no", "not-in-vocabularies", "spark")),
  (10, Array("中文", "spark"))
)).toDF("id", "words")

word2VecModel.transform(df2)
// the order of words doesn't mater
// +---+-------------------------------------+-----------------------------------------------------------------+
// |id |words                                |words_w2v                                                        |
// +---+-------------------------------------+-----------------------------------------------------------------+
// |6  |[not-in-vocabularies, neither, no]   |[0.0,0.0,0.0]                                                    |
// |7  |[spark, hell_no, not-in-vocabularies]|[0.0027440187210838,-0.0529780387878418,0.05730373660723368]     |
// |8  |[hell_no, not-in-vocabularies, spark]|[0.0027440187210838,-0.0529780387878418,0.05730373660723368]     |
// |9  |[not-in-vocabularies, hell_no, spark]|[0.0027440187210838,-0.0529780387878418,0.05730373660723368]     |
// |10 |[no, not-in-vocabularies, spark]     |[0.0027440187210838,-0.0529780387878418,0.05730373660723368]     |
// |11 |[中文, spark]                         |[-0.009499748703092337,-0.018227852880954742,0.13357853144407272]|
// +---+-------------------------------------+-----------------------------------------------------------------+

anotherWord2VecModel.findSynonyms("developer", 5)
// +-----------+------------------+
// |       word|        similarity|
// +-----------+------------------+
// |        dev| 0.881394624710083|
// |development|0.7730562090873718|
// |       oier|0.6866029500961304|
// |  develover|0.6720684766769409|
// |     webdev|0.6582568883895874|
// +-----------+------------------+

ref:
https://spark.apache.org/docs/latest/ml-features.html#word2vec

Calculate the Pearson Correlation between Features

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Matrix
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.Row

val featureNames = Array("stargazers_count", "forks_count", "subscribers_count")
val vectorAssembler = new VectorAssembler()
  .setInputCols(featureNames)
  .setOutputCol("features")

val df = vectorAssembler.transform(rawRepoInfoDS)
val correlationDF = Correlation.corr(df, "features")
val Row(coeff: Matrix) = correlationDF.head

println(featureNames.mkString(", "))
println(coeff.toString)
// stargazers_count, forks_count, subscribers_count
// 1.0                 0.5336901230713282  0.7664204175159971  
// 0.5336901230713282  1.0                 0.5414244966152617  
// 0.7664204175159971  0.5414244966152617  1.0

ref:
https://spark.apache.org/docs/latest/ml-statistics.html

DIMSUM

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

val repoWordRDD = repoVectorDF
  .select($"repo_id", $"text_w2v")
  .rdd
  .flatMap((row: Row) => {
    val repoId = row.getInt(0)
    val vector = row.getAs[DenseVector](1)
    vector.toArray.zipWithIndex.map({
      case (element, index) => MatrixEntry(repoId, index, element)
    })
  })
val repoWordMatrix = new CoordinateMatrix(repoWordRDD)
val wordRepoMatrix = repoWordMatrix.transpose

val repoSimilarityRDD = wordRepoMatrix
  .toRowMatrix
  .columnSimilarities(0.1)
  .entries
  .flatMap({
    case MatrixEntry(row: Long, col: Long, sim: Double) => {
      if (sim >= 0.5) {
        Array((row, col, sim))
      }
      else {
        None
      }
    }
  })
spark.createDataFrame(repoSimilarityRDD).toDF("item_1", "item_2", "similarity")
repoSimilarityDF.show(false)

ref:
https://stackoverflow.com/questions/42455725/columnsimilarities-back-to-spark-data-frame
https://forums.databricks.com/questions/248/when-should-i-use-rowmatrixcolumnsimilarities.html

Train a Locality Sensitive Hashing (LSH) Model: Bucketed Random Projection LSH

To specify the value of bucketLength, if input vectors are normalized, 1-10 times of pow(numRecords, -1/inputDim) would be a reasonable value. For instance, Math.pow(334913.0, -1.0 / 200.0) = 0.9383726472256705.

import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors

val userDF = spark.createDataFrame(Seq(
  (1, Vectors.sparse(6, Seq((0, -4.0), (1, 1.0), (2, 0.2)))),
  (2, Vectors.sparse(6, Seq((0, 5.5), (1, -0.6), (2, 9.0)))),
  (3, Vectors.sparse(6, Seq((1, 1.0), (2, 5.3), (4, 3.0)))),
  (4, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0)))),
  (5, Vectors.sparse(6, Seq((2, 1.0), (5, -0.2)))),
  (6, Vectors.sparse(6, Seq((0, 0.7)))),
  (7, Vectors.sparse(6, Seq((1, 0.3), (2, 1.0))))
)).toDF("user_id", "features")

val repoDF = spark.createDataFrame(Seq(
  (11, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0), (3, 1.0), (4, 1.0), (5, 1.0)))),
  (12, Vectors.sparse(6, Seq((0, 9.0), (1, -2.0), (2, -21.0), (3, 9.0), (4, 1.0), (5, 9.0)))),
  (13, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, -3.0), (3, 3.0), (4, 7.0), (5, 9.0)))),
  (14, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, -3.0)))),
  (15, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0))))
)).toDF("repo_id", "features")

val lsh = new BucketedRandomProjectionLSH()
  .setBucketLength(0.6812920690579612)
  .setNumHashTables(4)
  .setInputCol("features")
  .setOutputCol("hashes")
val lshModel = lsh.fit(repoDF)

val hashedUserDF = lshModel.transform(userDF)
val hashedRepoDF = lshModel.transform(repoDF)
hashedRepoDF.show(false)
// +-------+----------------------------------------------+--------------------------------+
// |repo_id|features                                      |hashes                          |
// +-------+----------------------------------------------+--------------------------------+
// |11     |(6,[0,1,2,3,4,5],[1.0,1.0,1.0,1.0,1.0,1.0])   |[[1.0], [-2.0], [-1.0], [-1.0]] |
// |12     |(6,[0,1,2,3,4,5],[9.0,-2.0,-21.0,9.0,1.0,9.0])|[[21.0], [-28.0], [18.0], [0.0]]|
// |13     |(6,[0,1,2,3,4,5],[1.0,1.0,-3.0,3.0,7.0,9.0])  |[[4.0], [-10.0], [6.0], [-3.0]] |
// |14     |(6,[0,1,2],[1.0,1.0,-3.0])                    |[[2.0], [-3.0], [2.0], [1.0]]   |
// |15     |(6,[1,2],[1.0,1.0])                           |[[-1.0], [0.0], [-2.0], [0.0]]  |
// +-------+----------------------------------------------+--------------------------------+

val similarDF = lshModel
  .approxSimilarityJoin(hashedUserDF, hashedRepoDF, 10.0, "distance")
  .select($"datasetA.user_id".alias("user_id"), $"datasetB.repo_id".alias("repo_id"), $"distance")
  .orderBy($"user_id", $"distance".asc)
similarDF.show(false)
// +-------+-------+------------------+
// |user_id|repo_id|distance          |
// +-------+-------+------------------+
// |1      |15     |4.079215610874228 |
// |3      |15     |5.243090691567332 |
// |4      |15     |1.0               |
// |4      |11     |1.7320508075688772|
// |5      |15     |1.019803902718557 |
// |5      |11     |2.33238075793812  |
// |6      |15     |1.57797338380595  |
// |7      |15     |0.7               |
// |7      |11     |2.118962010041709 |
// +-------+-------+------------------+

val userVector = Vectors.sparse(6, Seq((0, 1.5), (1, 0.8), (2, 2.0)))
val singleSimilarDF = lshModel
  .approxNearestNeighbors(hashedRepoDF, userVector, 5, "distance")
  .select($"repo_id", $"features", $"distance")
singleSimilarDF.show(false)
// +-------+----------------------------------------------+------------------+
// |repo_id|features                                      |distance          |
// +-------+----------------------------------------------+------------------+
// |15     |(6,[1,2],[1.0,1.0])                           |1.8138357147217055|
// |12     |(6,[0,1,2,3,4,5],[9.0,-2.0,-21.0,9.0,1.0,9.0])|27.49709075520536 |
// +-------+----------------------------------------------+------------------+

The problem of approxSimilarityJoin() is that you can't control the number of generated items, the disadvantage of approxNearestNeighbors() is that you have to manually iterate all users to find similar items. Moreover, both methods can easily suffer from the infamous java.lang.OutOfMemoryError.

ref:
https://spark.apache.org/docs/latest/ml-features.html#locality-sensitive-hashing

Train a Locality Sensitive Hashing (LSH) Model: MinHash LSH

MinHash LSH treats input as a binary vector, that is, all non-zero values (include negative values) are just 1. Basically, the Word2Vec vector won't be an appropriate input to MinHash LSH.

import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors

val userDF = spark.createDataFrame(Seq(
  (1, Vectors.sparse(6, Seq((0, -4.0), (1, 1.0), (2, 0.2)))),
  (2, Vectors.sparse(6, Seq((0, 5.5), (1, -0.6), (2, 9.0)))),
  (3, Vectors.sparse(6, Seq((1, 1.0), (2, 5.3), (4, 3.0)))),
  (4, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0)))),
  (5, Vectors.sparse(6, Seq((2, 1.0), (5, -0.2)))),
  (6, Vectors.sparse(6, Seq((2, 0.7)))),
  (7, Vectors.sparse(6, Seq((3, 0.3), (5, 1.0))))
)).toDF("user_id", "features")

val repoDF = spark.createDataFrame(Seq(
  (11, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
  (12, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
  (13, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("repo_id", "features")

val lsh = new MinHashLSH()
  .setNumHashTables(4)
  .setInputCol("features")
  .setOutputCol("hashes")

val lshModel = lsh.fit(userDF)
val hashedUserDF = lshModel.transform(userDF)
val hashedRepoDF = lshModel.transform(repoDF)

hashedUserDF.show(false)
// user 1 and 2 have the same hashed vector
// user 3 and 4 have the same hashed vector
// +-------+--------------------------+-----------------------------------------------------------------------+
// |user_id|features                  |hashes                                                                 |
// +-------+--------------------------+-----------------------------------------------------------------------+
// |1      |(6,[0,1,2],[-4.0,1.0,0.2])|[[-2.031299587E9], [-1.974869772E9], [-1.974047307E9], [4.95314097E8]] |
// |2      |(6,[0,1,2],[5.5,-0.6,9.0])|[[-2.031299587E9], [-1.974869772E9], [-1.974047307E9], [4.95314097E8]] |
// |3      |(6,[1,2,4],[1.0,5.3,3.0]) |[[-2.031299587E9], [-1.974869772E9], [-1.230128022E9], [8.7126731E8]]  |
// |4      |(6,[1,2,4],[1.0,1.0,1.0]) |[[-2.031299587E9], [-1.974869772E9], [-1.230128022E9], [8.7126731E8]]  |
// |5      |(6,[2,5],[1.0,-0.2])      |[[-2.031299587E9], [-1.758749518E9], [-4.86208737E8], [-1.919887134E9]]|
// |6      |(6,[2],[0.7])             |[[-2.031299587E9], [-1.758749518E9], [-4.86208737E8], [1.247220523E9]] |
// |7      |(6,[3,5],[0.3,1.0])       |[[-1.278435698E9], [-1.542629264E9], [2.57710548E8], [-1.919887134E9]] |
// +-------+--------------------------+-----------------------------------------------------------------------+

val userSimilarRepoDF = lshModel
  .approxSimilarityJoin(hashedUserDF, hashedRepoDF, 0.6, "distance")
  .select($"datasetA.user_id".alias("user_id"), $"datasetB.repo_id".alias("repo_id"), $"distance")
  .orderBy($"user_id", $"distance".asc)

userSimilarRepoDF.show(false)
// +-------+-------+-------------------+
// |user_id|repo_id|distance           |
// +-------+-------+-------------------+
// |1      |13     |0.5                |
// |2      |13     |0.5                |
// |3      |13     |0.0                |
// |4      |13     |0.0                |
// |5      |12     |0.33333333333333337|
// |7      |12     |0.33333333333333337|
// |7      |11     |0.33333333333333337|
// +-------+-------+-------------------+

ref:
https://databricks.com/blog/2017/05/09/detecting-abuse-scale-locality-sensitive-hashing-uber-engineering.html

Train a Logistic Regression Model

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.Vectors

val training = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(1.0, 2.5, 0.0, 0.0)),
  (1.0, Vectors.dense(0.1, 9.0, 0.0, 0.0)),
  (1.0, Vectors.dense(0.0, 0.0, 1.0, 0.0)),
  (0.0, Vectors.dense(0.0, 0.0, 2.0, 9.0)),
  (0.0, Vectors.dense(1.0, 0.0, 0.0, 5.0))
)).toDF("label", "features")

val lr = new LogisticRegression()
  .setMaxIter(100)
  .setRegParam(0.0)
  .setElasticNetParam(0.0)
  .setFamily("binomial")
  .setFeaturesCol("features")
  .setLabelCol("label")

lr.explainParams()

val lrModel = lr.fit(training)

println(s"Coefficients: ${lrModel.coefficients}")
// [2.0149015925419,2.694173163503675,9.547978766053463,-5.592221425156231]

println(s"Intercept: ${lrModel.intercept}")
// 8.552229795281482

val result = lrModel.transform(test)

ref:
https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression
https://spark.apache.org/docs/latest/mllib-linear-methods.html#logistic-regression

import org.apache.spark.ml.classification.BinaryLogisticRegressionSummary

val binarySummary = lrModel.summary.asInstanceOf[BinaryLogisticRegressionSummary]
println(s"Area Under ROC: ${binarySummary.areaUnderROC}")

ref:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.classification.BinaryLogisticRegressionTrainingSummary

Evaluate a Binary Classification Model

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.linalg.Vectors

val df = spark.createDataFrame(Seq(
  (Vectors.dense(0.0, 2.5), 1.0), // correct
  (Vectors.dense(1.0, 4.1), 1.0), // correct
  (Vectors.dense(9.2, 1.1), 0.0), // correct
  (Vectors.dense(1.0, 0.1), 0.0), // correct
  (Vectors.dense(5.0, 0.5), 1.0)  // incorrect
)).toDF("rawPrediction", "starring")

val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setRawPredictionCol("rawPrediction")
  .setLabelCol("starring")
val metric = evaluator.evaluate(df)
// 0.8333333333333333

ref:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

Train an ALS Model

import org.apache.spark.ml.recommendation.ALS

val df = spark.createDataFrame(Seq(
  (1, 1, 12),
  (1, 2, 90),
  (1, 4, 4),
  (2, 4, 1),
  (3, 5, 8)
)).toDF("user", "item", "rating")

val als = new ALS()
  .setImplicitPrefs(true)
  .setRank(5)
  .setRegParam(0.5)
  .setAlpha(40)
  .setMaxIter(10)
  .setSeed(42)
  .setColdStartStrategy("drop")
val alsModel = als.fit(df)

val predictionDF = alsModel.transform(df)
// +----+----+------+----------+
// |user|item|rating|prediction|
// +----+----+------+----------+
// |   1|   1|    12| 0.9988487|
// |   3|   5|     8| 0.9984464|
// |   1|   4|     4|0.99887615|
// |   2|   4|     1| 0.9921428|
// |   1|   2|    90| 0.9997897|
// +----+----+------+----------+

predictionDF.printSchema()
// root
 // |-- user: integer (nullable = false)
 // |-- item: integer (nullable = false)
 // |-- rating: integer (nullable = false)
// |-- prediction: float (nullable = false)

val userRecommendationsDF = alsModel.recommendForAllUsers(15)
// +----+-----------------------------------------------------------------+
// |user|recommendations                                                  |
// +----+-----------------------------------------------------------------+
// |1   |[[2,0.9997897], [4,0.9988761], [1,0.9988487], [5,0.0]]           |
// |3   |[[5,0.9984464], [1,2.9802322E-8], [2,0.0], [4,0.0]]              |
// |2   |[[4,0.9921428], [2,0.10759391], [1,0.10749264], [5,1.4901161E-8]]|
// +----+-----------------------------------------------------------------+

userRecommendationsDF.printSchema()
// root
 // |-- user: integer (nullable = false)
 // |-- recommendations: array (nullable = true)
 // |    |-- element: struct (containsNull = true)
 // |    |    |-- item: integer (nullable = true)
// |    |    |-- rating: float (nullable = true)

ref:
https://spark.apache.org/docs/latest/ml-collaborative-filtering.html

Save and Load an ALS Model

import org.apache.hadoop.mapred.InvalidInputException
import org.apache.spark.ml.recommendation.{ALS, ALSModel}

val alsModelSavePath = "./spark-data/20170902/alsModel.parquet"
val alsModel: ALSModel = try {
  ALSModel.load(alsModelSavePath)
} catch {
  case e: InvalidInputException => {
    if (e.getMessage().contains("Input path does not exist")) {
      val als = new ALS()
        .setImplicitPrefs(true)
        .setRank(100)
        .setRegParam(0.5)
        .setAlpha(40)
        .setMaxIter(22)
        .setSeed(42)
        .setColdStartStrategy("drop")
        .setUserCol("user_id")
        .setItemCol("repo_id")
        .setRatingCol("starring")
      val alsModel = als.fit(rawRepoStarringDS)
      alsModel.save(alsModelSavePath)
      alsModel
    } else {
      throw e
    }
  }
}

Create a Custom Transformer

package ws.vinta.albedo.transformers

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.{DoubleParam, Param, ParamMap}
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row}

import scala.collection.mutable

class NegativeBalancer(override val uid: String, val bcPopularItems: Broadcast[mutable.LinkedHashSet[Int]])
  extends Transformer with DefaultParamsWritable {

  def this(bcPopularItems: Broadcast[mutable.LinkedHashSet[Int]]) = {
    this(Identifiable.randomUID("negativeBalancer"), bcPopularItems)
  }

  val userCol = new Param[String](this, "userCol", "User 所在的欄位名稱")

  def getUserCol: String = $(userCol)

  def setUserCol(value: String): this.type = set(userCol, value)
  setDefault(userCol -> "user")

  val itemCol = new Param[String](this, "itemCol", "Item 所在的欄位名稱")

  def getItemCol: String = $(itemCol)

  def setItemCol(value: String): this.type = set(itemCol, value)
  setDefault(itemCol -> "item")

  val labelCol = new Param[String](this, "labelCol", "Label 所在的欄位名稱")

  def getLabelCol: String = $(labelCol)

  def setLabelCol(value: String): this.type = set(labelCol, value)
  setDefault(labelCol -> "label")

  val negativeValue = new DoubleParam(this, "negativeValue", "負樣本的值")

  def getNegativeValue: Double = $(negativeValue)

  def setNegativeValue(value: Double): this.type = set(negativeValue, value)
  setDefault(negativeValue -> 0.0)

  val negativePositiveRatio = new DoubleParam(this, "negativePositiveRatio", "負樣本與正樣本的比例")

  def getNegativePositiveRatio: Double = $(negativePositiveRatio)

  def setNegativePositiveRatio(value: Double): this.type = set(negativePositiveRatio, value)
  setDefault(negativePositiveRatio -> 1.0)

  override def transformSchema(schema: StructType): StructType = {
    Map($(userCol) -> IntegerType, $(itemCol) -> IntegerType, $(labelCol) -> DoubleType)
      .foreach{
        case(columnName: String, expectedDataType: DataType) => {
          val actualDataType = schema(columnName).dataType
          require(actualDataType.equals(IntegerType), s"Column $columnName must be of type $expectedDataType but was actually $actualDataType.")
        }
      }

    schema
  }

  override def transform(dataset: Dataset[_]): DataFrame = {
    transformSchema(dataset.schema)

    val popularItems: mutable.LinkedHashSet[Int] = this.bcPopularItems.value

    val emptyItemSet = new mutable.HashSet[Int]
    val addToItemSet = (itemSet: mutable.HashSet[Int], item: Int) => itemSet += item
    val mergeItemSets = (set1: mutable.HashSet[Int], set2: mutable.HashSet[Int]) => set1 ++= set2

    val getUserNegativeItems = (userItemsPair: (Int, mutable.HashSet[Int])) => {
      val (user, positiveItems) = userItemsPair
      val negativeItems = popularItems.diff(positiveItems)
      val requiredNegativeItemsCount = (positiveItems.size * this.getNegativePositiveRatio).toInt
      (user, negativeItems.slice(0, requiredNegativeItemsCount))
    }
    val expandNegativeItems = (userItemsPair: (Int, mutable.LinkedHashSet[Int])) => {
      val (user, negativeItems) = userItemsPair
      negativeItems.map({(user, _, $(negativeValue))})
    }

    import dataset.sparkSession.implicits._

    // TODO: 目前是假設傳進來的 dataset 都是 positive samples,之後可能得處理含有 negative samples 的情況
    val negativeDF = dataset
      .select($(userCol), $(itemCol))
      .rdd
      .map({
        case Row(user: Int, item: Int) => (user, item)
      })
      .aggregateByKey(emptyItemSet)(addToItemSet, mergeItemSets)
      .map(getUserNegativeItems)
      .flatMap(expandNegativeItems)
      .toDF($(userCol), $(itemCol), $(labelCol))

    dataset.select($(userCol), $(itemCol), $(labelCol)).union(negativeDF)
  }

  override def copy(extra: ParamMap): this.type = {
    defaultCopy(extra)
  }
}

ref:
https://www.safaribooksonline.com/library/view/high-performance-spark/9781491943199/ch09.html#extending_spark_ml
https://stackoverflow.com/questions/40615713/how-to-write-a-custom-transformer-in-mllib
https://issues.apache.org/jira/browse/SPARK-17048

Create a Custom Evaluator

package ws.vinta.albedo.evaluators

import org.apache.spark.ml.evaluation.Evaluator
import org.apache.spark.ml.param.{Param, ParamMap}
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable}
import org.apache.spark.mllib.evaluation.RankingMetrics
import org.apache.spark.sql.{DataFrame, Dataset, Row}

class RankingEvaluator(override val uid: String, val userActualItemsDF: DataFrame)
  extends Evaluator with DefaultParamsWritable {

  def this(userActualItemsDF: DataFrame) = {
    this(Identifiable.randomUID("rankingEvaluator"), userActualItemsDF)
  }

  val metricName = new Param[String](this, "metricName", "評估方式")

  def getMetricName: String = $(metricName)

  def setMetricName(value: String): this.type = set(metricName, value)
  setDefault(metricName -> "ndcg@k")

  val k = new Param[Int](this, "k", "只評估前 k 個 items 的排序結果")

  def getK: Int = $(k)

  def setK(value: Int): this.type = set(k, value)
  setDefault(k -> 15)

  override def isLargerBetter: Boolean = $(metricName) match {
    case "map" => true
    case "ndcg@k" => true
    case "precision@k" => true
  }

  override def evaluate(dataset: Dataset[_]): Double = {
    import dataset.sparkSession.implicits._

    val userPredictedItemsDF = dataset.select($"user_id", $"recommendations.repo_id".alias("items"))

    val bothItemsRDD = userPredictedItemsDF.join(userActualItemsDF, Seq("user_id", "user_id"))
      .select(userPredictedItemsDF.col("items"), userActualItemsDF.col("items"))
      .rdd
      .map((row: Row) => {
        // Row(userPredictedItems, userActualItems)
        (row(0).asInstanceOf[Seq[Int]].toArray, row(1).asInstanceOf[Seq[Int]].toArray)
      })

    val rankingMetrics = new RankingMetrics(bothItemsRDD)
    val metric = $(metricName) match {
      case "map" => rankingMetrics.meanAveragePrecision
      case "ndcg@k" => rankingMetrics.ndcgAt($(k))
      case "precision@k" => rankingMetrics.precisionAt($(k))
    }
    metric
  }

  override def copy(extra: ParamMap): RankingEvaluator = {
    defaultCopy(extra)
  }
}

ref:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems
https://www.safaribooksonline.com/library/view/spark-the-definitive/9781491912201/ch19.html#s6c5---recommendation

Apply Transformer on Multiple Columns

import org.apache.spark.ml.feature._

val userCategoricalColumnNames = Array("account_type", "clean_company", "clean_email", "clean_location")
val userCategoricalTransformers = userCategoricalColumnNames.flatMap((columnName: String) => {
  val stringIndexer = new StringIndexer()
    .setInputCol(columnName)
    .setOutputCol(s"${columnName}_index")
    .setHandleInvalid("keep")
  val oneHotEncoder = new OneHotEncoder()
    .setInputCol(s"${columnName}_index")
    .setOutputCol(s"${columnName}_ohe")
    .setDropLast(true)
  Array(stringIndexer, oneHotEncoder)
})
userCategoricalTransformers.foreach(println)
// strIdx_4029f57e379a
// oneHot_f0decb92a05c
// strIdx_fb855ad6caaa
// oneHot_f1be19344002
// strIdx_7fa62a683293
// oneHot_097ae442d8fc
// strIdx_0ff7ffa022a1
// oneHot_4a9f72a7f5d8

ref:
https://stackoverflow.com/questions/34167105/using-spark-mls-onehotencoder-on-multiple-columns

Cross-validate a Pipeline Model

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

val vectorAssembler = new VectorAssembler()
  .setInputCols(Array("feature1", "feature2", "feature3"))
  .setOutputCol("features")

val lr = new LogisticRegression()
  .setFeaturesCol("features")
  .setLabelCol("starring")

val pipeline = new Pipeline()
  .setStages(Array(vectorAssembler, lr))

val paramGrid = new ParamGridBuilder()
  .addGrid(lr.maxIter, Array(20, 100))
  .addGrid(lr.regParam, Array(0.0, 0.5, 1.0, 2.0))
  .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
  .build()

val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setRawPredictionCol("rawPrediction")
  .setLabelCol("starring")

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEstimatorParamMaps(paramGrid)
  .setEvaluator(evaluator)
  .setNumFolds(3)

val cvModel = cv.fit(trainingDF)

ref:
https://spark.apache.org/docs/latest/ml-tuning.html#cross-validation

Extract Best Parameters from a Cross-validation Model

import org.apache.spark.ml.PipelineModel
import org.apache.spark.ml.classification.LogisticRegressionModel

val bestPipelineModel = cvModel.bestModel.asInstanceOf[PipelineModel]
val lrModel = bestPipelineModel.stages(0).asInstanceOf[LogisticRegressionModel]
lrModel.extractParamMap()
// or
lrModel.explainParams()

ref:
https://stackoverflow.com/questions/31749593/how-to-extract-best-parameters-from-a-crossvalidatormodel

Show All Parameters of a Cross-validation Model

import org.apache.spark.ml.param.ParamMap

cvModel.getEstimatorParamMaps
  .zip(cvModel.avgMetrics)
  .sortWith(_._2 > _._2)
  .foreach((pair: (ParamMap, Double)) => {
    println(s"${pair._2}: ${pair._1}")
  })
// 0.8999999999999999: {
//     hashingTF_ac8be8d5806b-numFeatures: 1000,
//     logreg_9f79de6e51ec-regParam: 0.1
// }
// 0.8875: {
//     hashingTF_ac8be8d5806b-numFeatures: 100,
//     logreg_9f79de6e51ec-regParam: 0.1
// }
// 0.875: {
//     hashingTF_ac8be8d5806b-numFeatures: 100,
//     logreg_9f79de6e51ec-regParam: 0.01
// }

ref:
https://stackoverflow.com/questions/31749593/how-to-extract-best-parameters-from-a-crossvalidatormodel
https://alvinalexander.com/scala/how-sort-scala-sequences-seq-list-array-buffer-vector-ordering-ordered