碼天狗週刊 第 100 期 @vinta - Apache Spark, Scala, Machine Learning, Feature Engineering, MySQL

碼天狗週刊 第 100 期 @vinta - Apache Spark, Scala, Machine Learning, Feature Engineering, MySQL

本文同步發表於 CodeTengu Weekly - Issue 100

Big Data Analysis with Scala and Spark

因為前陣子辭職了(想放個長假吶~),突然多了不少時間,所以決定在打電動之餘,花點時間上幾門 Coursera 的課。然後花了一個禮拜終於完成這門課啦!主題是 Scala 和 Spark,是 Functional Programming in Scala 系列課程的最後一門課(這個系列有一門課的老師就是 Scala 的發明者 Martin Odersky)。之前學東西都習慣看書,這次第一次在 Coursera 完整地上完一門課,老實說是個很棒的體驗啊。尤其是在寫第一個程式作業的時候,上面標註說大概要花 3 小時,結果我寫了一個下午哈哈哈。雖然上手之後,後來的作業其實很快就做完了。但是還是忍不住想抱怨一下:Spark 的 Dataset typed API 寫起來也太麻煩,而且效能還沒有比較好。

題外話,雖然還沒那麼快要開始找工作,不過因為很閒,這陣子新認識了不少工程師同業,交流了很多技術經驗(和業界八卦 XD),感覺挺不錯的啊,所以想說如果各位朋友或公司有興趣,歡迎聯絡我,咱們可以約個時間吃個飯 👍

題外話之二,說到打電動,跟大家分享一下,12 月的時候 PC/PS4/Xbox One 會出「大神 絕景版」,這款遊戲可是 PS2 時代不朽的名作之一啊,如果你還沒有玩過,拜託玩一下。然!後!登登!小島秀夫的傑作之一 Anubis: Z.O.E. 也要重製啦!

Rules of Machine Learning: Best Practices for ML Engineering

不得了啊,這份文件,有在搞機器學習的人,這禮拜讀這篇就夠啦。這份文件的作者是 Google 的 Research Scientist,歸納了 43 條搭建一個 Machine Learning 系統的最佳實踐,而且很多都是從實務和軟體工程角度的經驗總結,這種知識特別寶貴啊。老實說我覺得這也是軟體工程師在這一波 AI 浪潮中可以施力的點,因為任何的機器學習系統或產品,宏觀一點來看,它就是一個軟體工程問題。

Most of the problems you will face are, in fact, engineering problems.

延伸閱讀(RecsysChina 的前輩對這篇文章的評註):

Feature Engineering - Getting most out of data for predictive models

這份簡報很豐富,除了幾乎把我前陣子才讀完的 Mastering Feature Engineering 書裡的內容都講完了之外,也提到很多特別的作法。而且看到最後一頁才發現,原來簡報的作者也讀過這本書啊。

是說我這一陣子實際搗鼓了一番 Feature Engineering 之後的感想,特徵工程根本就是手藝活,講求的其實是創造力啊。

延伸閱讀:

What happens when your application cannot open yet another connection to MySQL

之前工作的時候,要把 Spark 運算完的推薦系統的結果寫回 MySQL 資料庫,結果卻遇到了 ERROR 2003 (HY000): Can't connect to MySQL server on '192.168.x.x' (99) 的錯誤,很多人應該都見過類似的錯誤訊息,不過括號裡的數字可能不一樣。在這個例子中,單看文字的描述很容易讓人誤會是 MySQL server 出了什麼差錯,但是其實關鍵在於最後那個括號裡的 OS error code。這篇文章把造成這個錯誤的前因後果說得非常清楚,值得一讀。

簡單說,用 perror 99 可以查到這個 error code 是 Cannot assign requested address 的意思,原來是因為我在寫 Spark 的時候太放肆,不小心在短時間內開了太多的 connection,把 local 的 port 用完了(需要 TIME_WAIT 冷卻時間),所以沒有辦法分配 port 給新的 MySQL connection。解決的辦法:在程式裡重用 MySQL connection 或是限制一下 concurrent 數,再不然就是修改 net.ipv4.tcp_tw_reuse = 1 系統設定。

忍不住提一下,其實 master/slave 架構的 MySQL 還是很罩啊,當初ㄙㄨㄚˋ地一下在幾分鐘內寫了四十幾億筆資料進去,MySQL 跟沒事一樣。原本還ㄏㄧㄠˊ咖稱想用 Cassandra,結果發現在現階段用 MySQL 就頂得住啦。

延伸閱讀:

Spark ML cookbook (Scala)

Spark ML cookbook (Scala)

Scala is the first class citizen language for interacting with Apache Spark, but it's difficult to learn. This article is mostly about Spark ML - the new Spark Machine Learning library which was rewritten in DataFrame-based API.

Convert a String Categorical Feature into Numeric One

StringIndexer converts labels (categorical values) into numbers (0.0, 1.0, 2.0 and so on) which ordered by label frequencies, the most frequnet label gets 0. This method is able to handle unseen labels with optional strategies.

StringIndexer's inputCol accepts string, numeric and boolean types.

val df1 = spark.createDataFrame(Seq(
    (1, "Python"),
    (2, "C++"),
    (3, "C++"),
    (4, "JavaScript"),
    (5, "Python"),
    (6, "Python"),
    (7, "Go")
BinaryClassificationEvaluator
)).toDF("repo_id", "repo_language")

val df2 = spark.createDataFrame(Seq(
    (1, "Python"),
    (2, "C++"),
    (3, "C++"),
    (4, "JavaScript"),
    (5, "Python"),
    (6, "Python"),
    (7, "Go"),
    (8, "JavaScript"),
    (9, "Brainfuck"),
    (10, "Brainfuck"),
    (11, "Red")
)).toDF("repo_id", "repo_language")

import org.apache.spark.ml.feature.StringIndexer

val stringIndexer = new StringIndexer()
  .setInputCol("repo_language")
  .setOutputCol("repo_language_index")
  .setHandleInvalid("keep")
val stringIndexerModel = stringIndexer.fit(df1)

stringIndexerModel.labels
// Array[String] = Array(Python, C++, JavaScript, Go)

val indexedDF = stringIndexerModel.transform(df2)
indexedDF.show()
// +-------+-------------+-------------------+
// |repo_id|repo_language|repo_language_index|
// +-------+-------------+-------------------+
// |      1|       Python|                0.0|
// |      2|          C++|                1.0|
// |      3|          C++|                1.0|
// |      4|   JavaScript|                3.0|
// |      5|       Python|                0.0|
// |      6|       Python|                0.0|
// |      7|           Go|                2.0|
// |      8|   JavaScript|                3.0|
// |      9|    Brainfuck|                4.0| <- previously unseen
// |     10|    Brainfuck|                4.0| <- previously unseen
// |     11|          Red|                4.0| <- previously unseen
// +-------+-------------+-------------------+

ref:
https://spark.apache.org/docs/latest/ml-features.html#stringindexer
https://stackoverflow.com/questions/34681534/spark-ml-stringindexer-handling-unseen-labels
https://stackoverflow.com/questions/32277576/how-to-handle-categorical-features-with-spark-ml/32278617

Convert an Indexed Numeric Feature Back to the Original Categorical One

import org.apache.spark.ml.feature.IndexToString

val indexToString = new IndexToString()
  .setInputCol("repo_language_index")
  .setOutputCol("repo_language_ori")

val oriIndexedDF = indexToString.transform(indexedDF)
oriIndexedDF.show()
// +-------+-------------+-------------------+----------------------+
// |repo_id|repo_language|repo_language_index|     repo_language_ori|
// +-------+-------------+-------------------+----------------------+
// |      1|       Python|                0.0|                Python|
// |      2|          C++|                1.0|                   C++|
// |      3|          C++|                1.0|                   C++|
// |      4|   JavaScript|                2.0|            JavaScript|
// |      5|       Python|                0.0|                Python|
// |      6|       Python|                0.0|                Python|
// |      7|           Go|                3.0|                    Go|
// |      8|   JavaScript|                2.0|            JavaScript|
// |      9|    Brainfuck|                4.0|             __unknown| <- previously unseen
// |     10|    Brainfuck|                4.0|             __unknown| <- previously unseen
// |     11|          Red|                4.0|             __unknown| <- previously unseen
// +-------+-------------+-------------------+----------------------+

ref:
https://spark.apache.org/docs/latest/ml-features.html#indextostring

One-hot Encoding for Categorical Features

OneHotEncoder's input column only accepts numeric types. If you have string columns, you need to use StringIndexer to transform them into doubles, bessides, StringIndexer is able to properly deal with unseen values. In my humble opinion, you should always apply StringIndexer before OneHotEncoder.

Be careful that OneHotEncoder's vector length will be the maximun value in the column, you must apply OneHotEncoder on the union dataset of both training set and test set. Since OneHotEncoder does not accept empty string for name, you need to replace all empty strings with a placeholder, something like __empty.

import org.apache.spark.ml.feature.OneHotEncoder

val knownDF = spark.createDataFrame(Seq(
  (2, "b"),
  (3, "c"),
  (0, "x"),
  (6, "c"),
  (4, "a"),
  (1, "a"),
  (5, "a")
)).toDF("category_1", "category_2")

val unseenDF = spark.createDataFrame(Seq(
  (123, "e"),
  (6, "c"),
  (2, "b"),
  (456, "c"),
  (1, "a")
)).toDF("category_1", "category_2")

val knownOneHotDF = new OneHotEncoder()
  .setDropLast(true)
  .setInputCol("category_1")
  .setOutputCol("category_1_one_hot")
  .transform(knownDF)
knownOneHotDF.show()
// +----------+----------+------------------+
// |category_1|category_2|category_1_one_hot|
// +----------+----------+------------------+
// |         2|         b|     (6,[2],[1.0])|
// |         3|         c|     (6,[3],[1.0])|
// |         0|         x|     (6,[0],[1.0])|
// |         6|         c|         (6,[],[])|
// |         4|         a|     (6,[4],[1.0])|
// |         1|         a|     (6,[1],[1.0])|
// |         5|         a|     (6,[5],[1.0])|
// +----------+----------+------------------+

val unseenOneHotDF = new OneHotEncoder()
  .setDropLast(true)
  .setInputCol("category_1")
  .setOutputCol("category_1_one_hot")
  .transform(unseenDF)
unseenOneHotDF.show()
// +----------+----------+------------------+
// |category_1|category_2|category_1_one_hot|
// +----------+----------+------------------+
// |       123|         e| (456,[123],[1.0])|
// |         6|         c|   (456,[6],[1.0])|
// |         2|         b|   (456,[2],[1.0])|
// |       456|         c|       (456,[],[])|
// |         1|         a|   (456,[1],[1.0])|
// +----------+----------+------------------+

ref:
https://spark.apache.org/docs/latest/ml-features.html#onehotencoder
https://stackoverflow.com/questions/32277576/how-to-handle-categorical-features-with-spark-ml/40615508
https://stackoverflow.com/questions/33089781/spark-dataframe-handing-empty-string-in-onehotencoder

Create a Regular Expression Tokenizer

setGaps(true) 時的 pattern 是 match 分隔符;setGaps(false) 時的 pattern 則是 match 字。

import org.apache.spark.ml.feature.RegexTokenizer
import org.apache.spark.sql.functions._

val sentenceDF = spark.createDataFrame(Seq(
  (1, "Hi, I heard about Spark"),
  (2, "I wish Java could use case classes."),
  (3, "Deep,Learning,models,are,state-of-the-art"),
  (4, "fuck_yeah!!! No.")
)).toDF("id", "sentence")

val countTokensUDF = udf((words: Seq[String]) => words.length)

val regexTokenizer = new RegexTokenizer()
  .setInputCol("sentence")
  .setOutputCol("words")
  .setPattern("""[\w\-_]+""").setGaps(false)
  // .setPattern("""\W""").setGaps(true)
  // .setPattern("""[,. ]""").setGaps(true)
val tokenizedDF = regexTokenizer.transform(sentenceDF)

val df = tokenizedDF
  .select("sentence", "words")
  .withColumn("count", countTokensUDF($"words"))
// +-----------------------------------------+-----------------------------------------------+-----+
// |sentence                                 |words                                          |count|
// +-----------------------------------------+-----------------------------------------------+-----+
// |Hi, I heard about Spark                  |[hi, i, heard, about, spark]                   |5    |
// |I wish Java could use case classes.      |[i, wish, java, could, use, case, classes]     |7    |
// |Deep,Learning,models,are,state-of-the-art|[deep, learning, models, are, state-of-the-art]|5    |
// |fuck_yeah!!! No.                         |[fuck_yeah, no]                                |2    |
// +-----------------------------------------+-----------------------------------------------+-----+

ref:
https://spark.apache.org/docs/latest/ml-features.html#tokenizer

Handle Comma-seperated Categorical Column

You could use RegexTokenizer, CountVectorizer or HashingTF.

import org.apache.spark.ml.feature.{RegexTokenizer, CountVectorizer}

val df = spark.createDataFrame(Seq(
  (1, "Action,Sci-Fi"),
  (2, "Sci-Fi,Romance,Horror"),
  (3, "War,Horror")
)).toDF("movie_id", "genres")

val regexTokenizer = new RegexTokenizer()
  .setInputCol("genres")
  .setOutputCol("genres_words")
  .setPattern("""[\w\-_]+""").setGaps(false)
val wordsDF = regexTokenizer.transform(df)

val countVectorizerModel = new CountVectorizer()
  .setInputCol("genres_words")
  .setOutputCol("genres_vector")
  .setMinDF(1) // for whole corpus, delete any term that appears less then n times
  .setMinTF(1) // for each document, delete any term that appears less then n times
  .fit(wordsDF)
val countVectorDF = countModel.transform(wordsDF)

// HashingTF might suffer from potential hash collisions
// it's good to use a power of two
val hashingTF = new HashingTF()
  .setInputCol("genres_words")
  .setOutputCol("genres_htf_vector")
  .setNumFeatures(4)
val htfVectorDF = hashingTF.transform(countVectorDF)

htfVectorDF.show(false)
// +--------+---------------------+-------------------------+-------------------------+-------------------+
// |movie_id|genres               |genres_words             |genres_count_vector      |genres_htf_vector  |
// +--------+---------------------+-------------------------+-------------------------+-------------------+
// |1       |Action,Sci-Fi        |[action, sci-fi]         |(5,[0,3],[1.0,1.0])      |(4,[0],[2.0])      |
// |2       |Sci-Fi,Romance,Horror|[sci-fi, romance, horror]|(5,[0,1,4],[1.0,1.0,1.0])|(4,[0,2],[2.0,1.0])|
// |3       |War,Horror           |[war, horror]            |(5,[1,2],[1.0,1.0])      |(4,[0,2],[1.0,1.0])|
// +--------+---------------------+-------------------------+-------------------------+-------------------+

countModel.vocabulary
// Array(sci-fi, horror, action, romance, war)

ref:
https://spark.apache.org/docs/latest/ml-features.html#countvectorizer
https://spark.apache.org/docs/latest/ml-features.html#tf-idf

Train a Word2Vec Model

The output vector of any Word2Vec model is dense!

import org.apache.spark.ml.feature.Word2Vec

val df = spark.createDataFrame(Seq(
  (1, "Hi I heard about Apache Spark".toLowerCase().split(" ")),
  (2, "I wish Java could use case classes".toLowerCase().split(" ")),
  (3, "Logistic regression models are neat".toLowerCase().split(" ")),
  (4, "Apache Spark with Scala is awesome".toLowerCase().split(" ")),
  (5, Array("中文", "嘛ㄟ通", "but", "必須", "另外", "分詞"))
)).toDF("id", "words")

val word2Vec = new Word2Vec()
  .setInputCol("words")
  .setOutputCol("words_w2v")
  .setMaxIter(10)
  .setVectorSize(3)
  .setWindowSize(5)
  .setMinCount(1)
val word2VecModel = word2Vec.fit(df)

word2VecModel.transform(df)
// +---+------------------------------------------+----------------------------------------------------------+
// |id |words                                     |words_w2v                                                 |
// +---+------------------------------------------+----------------------------------------------------------+
// |1  |[hi, i, heard, about, apache, spark]      |[-0.02013699459393,-0.02995631482274,0.047685102870066956]|
// |2  |[i, wish, java, could, use, case, classes]|[-0.05012317272186,0.01141336891094,-0.03742781743806387] |
// |3  |[logistic, regression, models, are, neat] |[-0.04678827972413,0.032994424477,0.0010566591750830413]  |
// |4  |[apache, spark, with, scala, is, awesome] |[0.0265524153169,0.02056275321716,0.013326843579610188]   |
// |5  |[中文, 嘛ㄟ通, but, 必須, 另外, 分詞]         |[0.0571783996973,-0.02301329133545,0.013507421438892681]  |
// +---+------------------------------------------+----------------------------------------------------------+

val df2 = spark.createDataFrame(Seq(
  (6, Array("not-in-vocabularies", "neither", "no")),
  (7, Array("spark", "not-in-vocabularies")),
  (8, Array("not-in-vocabularies", "spark")),
  (9, Array("no", "not-in-vocabularies", "spark")),
  (10, Array("中文", "spark"))
)).toDF("id", "words")

word2VecModel.transform(df2)
// the order of words doesn't mater
// +---+-------------------------------------+-----------------------------------------------------------------+
// |id |words                                |words_w2v                                                        |
// +---+-------------------------------------+-----------------------------------------------------------------+
// |6  |[not-in-vocabularies, neither, no]   |[0.0,0.0,0.0]                                                    |
// |7  |[spark, hell_no, not-in-vocabularies]|[0.0027440187210838,-0.0529780387878418,0.05730373660723368]     |
// |8  |[hell_no, not-in-vocabularies, spark]|[0.0027440187210838,-0.0529780387878418,0.05730373660723368]     |
// |9  |[not-in-vocabularies, hell_no, spark]|[0.0027440187210838,-0.0529780387878418,0.05730373660723368]     |
// |10 |[no, not-in-vocabularies, spark]     |[0.0027440187210838,-0.0529780387878418,0.05730373660723368]     |
// |11 |[中文, spark]                         |[-0.009499748703092337,-0.018227852880954742,0.13357853144407272]|
// +---+-------------------------------------+-----------------------------------------------------------------+

anotherWord2VecModel.findSynonyms("developer", 5)
// +-----------+------------------+
// |       word|        similarity|
// +-----------+------------------+
// |        dev| 0.881394624710083|
// |development|0.7730562090873718|
// |       oier|0.6866029500961304|
// |  develover|0.6720684766769409|
// |     webdev|0.6582568883895874|
// +-----------+------------------+

ref:
https://spark.apache.org/docs/latest/ml-features.html#word2vec

Calculate the Pearson Correlation between Features

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Matrix
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.Row

val featureNames = Array("stargazers_count", "forks_count", "subscribers_count")
val vectorAssembler = new VectorAssembler()
  .setInputCols(featureNames)
  .setOutputCol("features")

val df = vectorAssembler.transform(rawRepoInfoDS)
val correlationDF = Correlation.corr(df, "features")
val Row(coeff: Matrix) = correlationDF.head

println(featureNames.mkString(", "))
println(coeff.toString)
// stargazers_count, forks_count, subscribers_count
// 1.0                 0.5336901230713282  0.7664204175159971  
// 0.5336901230713282  1.0                 0.5414244966152617  
// 0.7664204175159971  0.5414244966152617  1.0

ref:
https://spark.apache.org/docs/latest/ml-statistics.html

DIMSUM

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

val repoWordRDD = repoVectorDF
  .select($"repo_id", $"text_w2v")
  .rdd
  .flatMap((row: Row) => {
    val repoId = row.getInt(0)
    val vector = row.getAs[DenseVector](1)
    vector.toArray.zipWithIndex.map({
      case (element, index) => MatrixEntry(repoId, index, element)
    })
  })
val repoWordMatrix = new CoordinateMatrix(repoWordRDD)
val wordRepoMatrix = repoWordMatrix.transpose

val repoSimilarityRDD = wordRepoMatrix
  .toRowMatrix
  .columnSimilarities(0.1)
  .entries
  .flatMap({
    case MatrixEntry(row: Long, col: Long, sim: Double) => {
      if (sim >= 0.5) {
        Array((row, col, sim))
      }
      else {
        None
      }
    }
  })
spark.createDataFrame(repoSimilarityRDD).toDF("item_1", "item_2", "similarity")
repoSimilarityDF.show(false)

ref:
https://stackoverflow.com/questions/42455725/columnsimilarities-back-to-spark-data-frame
https://forums.databricks.com/questions/248/when-should-i-use-rowmatrixcolumnsimilarities.html

Train a Locality Sensitive Hashing (LSH) Model: Bucketed Random Projection LSH

To specify the value of bucketLength, if input vectors are normalized, 1-10 times of pow(numRecords, -1/inputDim) would be a reasonable value. For instance, Math.pow(334913.0, -1.0 / 200.0) = 0.9383726472256705.

import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors

val userDF = spark.createDataFrame(Seq(
  (1, Vectors.sparse(6, Seq((0, -4.0), (1, 1.0), (2, 0.2)))),
  (2, Vectors.sparse(6, Seq((0, 5.5), (1, -0.6), (2, 9.0)))),
  (3, Vectors.sparse(6, Seq((1, 1.0), (2, 5.3), (4, 3.0)))),
  (4, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0)))),
  (5, Vectors.sparse(6, Seq((2, 1.0), (5, -0.2)))),
  (6, Vectors.sparse(6, Seq((0, 0.7)))),
  (7, Vectors.sparse(6, Seq((1, 0.3), (2, 1.0))))
)).toDF("user_id", "features")

val repoDF = spark.createDataFrame(Seq(
  (11, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0), (3, 1.0), (4, 1.0), (5, 1.0)))),
  (12, Vectors.sparse(6, Seq((0, 9.0), (1, -2.0), (2, -21.0), (3, 9.0), (4, 1.0), (5, 9.0)))),
  (13, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, -3.0), (3, 3.0), (4, 7.0), (5, 9.0)))),
  (14, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, -3.0)))),
  (15, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0))))
)).toDF("repo_id", "features")

val lsh = new BucketedRandomProjectionLSH()
  .setBucketLength(0.6812920690579612)
  .setNumHashTables(4)
  .setInputCol("features")
  .setOutputCol("hashes")
val lshModel = lsh.fit(repoDF)

val hashedUserDF = lshModel.transform(userDF)
val hashedRepoDF = lshModel.transform(repoDF)
hashedRepoDF.show(false)
// +-------+----------------------------------------------+--------------------------------+
// |repo_id|features                                      |hashes                          |
// +-------+----------------------------------------------+--------------------------------+
// |11     |(6,[0,1,2,3,4,5],[1.0,1.0,1.0,1.0,1.0,1.0])   |[[1.0], [-2.0], [-1.0], [-1.0]] |
// |12     |(6,[0,1,2,3,4,5],[9.0,-2.0,-21.0,9.0,1.0,9.0])|[[21.0], [-28.0], [18.0], [0.0]]|
// |13     |(6,[0,1,2,3,4,5],[1.0,1.0,-3.0,3.0,7.0,9.0])  |[[4.0], [-10.0], [6.0], [-3.0]] |
// |14     |(6,[0,1,2],[1.0,1.0,-3.0])                    |[[2.0], [-3.0], [2.0], [1.0]]   |
// |15     |(6,[1,2],[1.0,1.0])                           |[[-1.0], [0.0], [-2.0], [0.0]]  |
// +-------+----------------------------------------------+--------------------------------+

val similarDF = lshModel
  .approxSimilarityJoin(hashedUserDF, hashedRepoDF, 10.0, "distance")
  .select($"datasetA.user_id".alias("user_id"), $"datasetB.repo_id".alias("repo_id"), $"distance")
  .orderBy($"user_id", $"distance".asc)
similarDF.show(false)
// +-------+-------+------------------+
// |user_id|repo_id|distance          |
// +-------+-------+------------------+
// |1      |15     |4.079215610874228 |
// |3      |15     |5.243090691567332 |
// |4      |15     |1.0               |
// |4      |11     |1.7320508075688772|
// |5      |15     |1.019803902718557 |
// |5      |11     |2.33238075793812  |
// |6      |15     |1.57797338380595  |
// |7      |15     |0.7               |
// |7      |11     |2.118962010041709 |
// +-------+-------+------------------+

val userVector = Vectors.sparse(6, Seq((0, 1.5), (1, 0.8), (2, 2.0)))
val singleSimilarDF = lshModel
  .approxNearestNeighbors(hashedRepoDF, userVector, 5, "distance")
  .select($"repo_id", $"features", $"distance")
singleSimilarDF.show(false)
// +-------+----------------------------------------------+------------------+
// |repo_id|features                                      |distance          |
// +-------+----------------------------------------------+------------------+
// |15     |(6,[1,2],[1.0,1.0])                           |1.8138357147217055|
// |12     |(6,[0,1,2,3,4,5],[9.0,-2.0,-21.0,9.0,1.0,9.0])|27.49709075520536 |
// +-------+----------------------------------------------+------------------+

The problem of approxSimilarityJoin() is that you can't control the number of generated items, the disadvantage of approxNearestNeighbors() is that you have to manually iterate all users to find similar items. Moreover, both methods can easily suffer from the infamous java.lang.OutOfMemoryError.

ref:
https://spark.apache.org/docs/latest/ml-features.html#locality-sensitive-hashing

Train a Locality Sensitive Hashing (LSH) Model: MinHash LSH

MinHash LSH treats input as a binary vector, that is, all non-zero values (include negative values) are just 1. Basically, the Word2Vec vector won't be an appropriate input to MinHash LSH.

import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors

val userDF = spark.createDataFrame(Seq(
  (1, Vectors.sparse(6, Seq((0, -4.0), (1, 1.0), (2, 0.2)))),
  (2, Vectors.sparse(6, Seq((0, 5.5), (1, -0.6), (2, 9.0)))),
  (3, Vectors.sparse(6, Seq((1, 1.0), (2, 5.3), (4, 3.0)))),
  (4, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0)))),
  (5, Vectors.sparse(6, Seq((2, 1.0), (5, -0.2)))),
  (6, Vectors.sparse(6, Seq((2, 0.7)))),
  (7, Vectors.sparse(6, Seq((3, 0.3), (5, 1.0))))
)).toDF("user_id", "features")

val repoDF = spark.createDataFrame(Seq(
  (11, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
  (12, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
  (13, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("repo_id", "features")

val lsh = new MinHashLSH()
  .setNumHashTables(4)
  .setInputCol("features")
  .setOutputCol("hashes")

val lshModel = lsh.fit(userDF)
val hashedUserDF = lshModel.transform(userDF)
val hashedRepoDF = lshModel.transform(repoDF)

hashedUserDF.show(false)
// user 1 and 2 have the same hashed vector
// user 3 and 4 have the same hashed vector
// +-------+--------------------------+-----------------------------------------------------------------------+
// |user_id|features                  |hashes                                                                 |
// +-------+--------------------------+-----------------------------------------------------------------------+
// |1      |(6,[0,1,2],[-4.0,1.0,0.2])|[[-2.031299587E9], [-1.974869772E9], [-1.974047307E9], [4.95314097E8]] |
// |2      |(6,[0,1,2],[5.5,-0.6,9.0])|[[-2.031299587E9], [-1.974869772E9], [-1.974047307E9], [4.95314097E8]] |
// |3      |(6,[1,2,4],[1.0,5.3,3.0]) |[[-2.031299587E9], [-1.974869772E9], [-1.230128022E9], [8.7126731E8]]  |
// |4      |(6,[1,2,4],[1.0,1.0,1.0]) |[[-2.031299587E9], [-1.974869772E9], [-1.230128022E9], [8.7126731E8]]  |
// |5      |(6,[2,5],[1.0,-0.2])      |[[-2.031299587E9], [-1.758749518E9], [-4.86208737E8], [-1.919887134E9]]|
// |6      |(6,[2],[0.7])             |[[-2.031299587E9], [-1.758749518E9], [-4.86208737E8], [1.247220523E9]] |
// |7      |(6,[3,5],[0.3,1.0])       |[[-1.278435698E9], [-1.542629264E9], [2.57710548E8], [-1.919887134E9]] |
// +-------+--------------------------+-----------------------------------------------------------------------+

val userSimilarRepoDF = lshModel
  .approxSimilarityJoin(hashedUserDF, hashedRepoDF, 0.6, "distance")
  .select($"datasetA.user_id".alias("user_id"), $"datasetB.repo_id".alias("repo_id"), $"distance")
  .orderBy($"user_id", $"distance".asc)

userSimilarRepoDF.show(false)
// +-------+-------+-------------------+
// |user_id|repo_id|distance           |
// +-------+-------+-------------------+
// |1      |13     |0.5                |
// |2      |13     |0.5                |
// |3      |13     |0.0                |
// |4      |13     |0.0                |
// |5      |12     |0.33333333333333337|
// |7      |12     |0.33333333333333337|
// |7      |11     |0.33333333333333337|
// +-------+-------+-------------------+

ref:
https://databricks.com/blog/2017/05/09/detecting-abuse-scale-locality-sensitive-hashing-uber-engineering.html

Train a Logistic Regression Model

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.Vectors

val training = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(1.0, 2.5, 0.0, 0.0)),
  (1.0, Vectors.dense(0.1, 9.0, 0.0, 0.0)),
  (1.0, Vectors.dense(0.0, 0.0, 1.0, 0.0)),
  (0.0, Vectors.dense(0.0, 0.0, 2.0, 9.0)),
  (0.0, Vectors.dense(1.0, 0.0, 0.0, 5.0))
)).toDF("label", "features")

val lr = new LogisticRegression()
  .setMaxIter(100)
  .setRegParam(0.0)
  .setElasticNetParam(0.0)
  .setFamily("binomial")
  .setFeaturesCol("features")
  .setLabelCol("label")

lr.explainParams()

val lrModel = lr.fit(training)

println(s"Coefficients: ${lrModel.coefficients}")
// [2.0149015925419,2.694173163503675,9.547978766053463,-5.592221425156231]

println(s"Intercept: ${lrModel.intercept}")
// 8.552229795281482

val result = lrModel.transform(test)

ref:
https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression
https://spark.apache.org/docs/latest/mllib-linear-methods.html#logistic-regression

import org.apache.spark.ml.classification.BinaryLogisticRegressionSummary

val binarySummary = lrModel.summary.asInstanceOf[BinaryLogisticRegressionSummary]
println(s"Area Under ROC: ${binarySummary.areaUnderROC}")

ref:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.classification.BinaryLogisticRegressionTrainingSummary

Evaluate a Binary Classification Model

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.linalg.Vectors

val df = spark.createDataFrame(Seq(
  (Vectors.dense(0.0, 2.5), 1.0), // correct
  (Vectors.dense(1.0, 4.1), 1.0), // correct
  (Vectors.dense(9.2, 1.1), 0.0), // correct
  (Vectors.dense(1.0, 0.1), 0.0), // correct
  (Vectors.dense(5.0, 0.5), 1.0)  // incorrect
)).toDF("rawPrediction", "starring")

val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setRawPredictionCol("rawPrediction")
  .setLabelCol("starring")
val metric = evaluator.evaluate(df)
// 0.8333333333333333

ref:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

Train an ALS Model

import org.apache.spark.ml.recommendation.ALS

val df = spark.createDataFrame(Seq(
  (1, 1, 12),
  (1, 2, 90),
  (1, 4, 4),
  (2, 4, 1),
  (3, 5, 8)
)).toDF("user", "item", "rating")

val als = new ALS()
  .setImplicitPrefs(true)
  .setRank(5)
  .setRegParam(0.5)
  .setAlpha(40)
  .setMaxIter(10)
  .setSeed(42)
  .setColdStartStrategy("drop")
val alsModel = als.fit(df)

val predictionDF = alsModel.transform(df)
// +----+----+------+----------+
// |user|item|rating|prediction|
// +----+----+------+----------+
// |   1|   1|    12| 0.9988487|
// |   3|   5|     8| 0.9984464|
// |   1|   4|     4|0.99887615|
// |   2|   4|     1| 0.9921428|
// |   1|   2|    90| 0.9997897|
// +----+----+------+----------+

predictionDF.printSchema()
// root
 // |-- user: integer (nullable = false)
 // |-- item: integer (nullable = false)
 // |-- rating: integer (nullable = false)
// |-- prediction: float (nullable = false)

val userRecommendationsDF = alsModel.recommendForAllUsers(15)
// +----+-----------------------------------------------------------------+
// |user|recommendations                                                  |
// +----+-----------------------------------------------------------------+
// |1   |[[2,0.9997897], [4,0.9988761], [1,0.9988487], [5,0.0]]           |
// |3   |[[5,0.9984464], [1,2.9802322E-8], [2,0.0], [4,0.0]]              |
// |2   |[[4,0.9921428], [2,0.10759391], [1,0.10749264], [5,1.4901161E-8]]|
// +----+-----------------------------------------------------------------+

userRecommendationsDF.printSchema()
// root
 // |-- user: integer (nullable = false)
 // |-- recommendations: array (nullable = true)
 // |    |-- element: struct (containsNull = true)
 // |    |    |-- item: integer (nullable = true)
// |    |    |-- rating: float (nullable = true)

ref:
https://spark.apache.org/docs/latest/ml-collaborative-filtering.html

Save and Load an ALS Model

import org.apache.hadoop.mapred.InvalidInputException
import org.apache.spark.ml.recommendation.{ALS, ALSModel}

val alsModelSavePath = "./spark-data/20170902/alsModel.parquet"
val alsModel: ALSModel = try {
  ALSModel.load(alsModelSavePath)
} catch {
  case e: InvalidInputException => {
    if (e.getMessage().contains("Input path does not exist")) {
      val als = new ALS()
        .setImplicitPrefs(true)
        .setRank(100)
        .setRegParam(0.5)
        .setAlpha(40)
        .setMaxIter(22)
        .setSeed(42)
        .setColdStartStrategy("drop")
        .setUserCol("user_id")
        .setItemCol("repo_id")
        .setRatingCol("starring")
      val alsModel = als.fit(rawRepoStarringDS)
      alsModel.save(alsModelSavePath)
      alsModel
    } else {
      throw e
    }
  }
}

Create a Custom Transformer

package ws.vinta.albedo.transformers

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.{DoubleParam, Param, ParamMap}
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row}

import scala.collection.mutable

class NegativeBalancer(override val uid: String, val bcPopularItems: Broadcast[mutable.LinkedHashSet[Int]])
  extends Transformer with DefaultParamsWritable {

  def this(bcPopularItems: Broadcast[mutable.LinkedHashSet[Int]]) = {
    this(Identifiable.randomUID("negativeBalancer"), bcPopularItems)
  }

  val userCol = new Param[String](this, "userCol", "User 所在的欄位名稱")

  def getUserCol: String = $(userCol)

  def setUserCol(value: String): this.type = set(userCol, value)
  setDefault(userCol -> "user")

  val itemCol = new Param[String](this, "itemCol", "Item 所在的欄位名稱")

  def getItemCol: String = $(itemCol)

  def setItemCol(value: String): this.type = set(itemCol, value)
  setDefault(itemCol -> "item")

  val labelCol = new Param[String](this, "labelCol", "Label 所在的欄位名稱")

  def getLabelCol: String = $(labelCol)

  def setLabelCol(value: String): this.type = set(labelCol, value)
  setDefault(labelCol -> "label")

  val negativeValue = new DoubleParam(this, "negativeValue", "負樣本的值")

  def getNegativeValue: Double = $(negativeValue)

  def setNegativeValue(value: Double): this.type = set(negativeValue, value)
  setDefault(negativeValue -> 0.0)

  val negativePositiveRatio = new DoubleParam(this, "negativePositiveRatio", "負樣本與正樣本的比例")

  def getNegativePositiveRatio: Double = $(negativePositiveRatio)

  def setNegativePositiveRatio(value: Double): this.type = set(negativePositiveRatio, value)
  setDefault(negativePositiveRatio -> 1.0)

  override def transformSchema(schema: StructType): StructType = {
    Map($(userCol) -> IntegerType, $(itemCol) -> IntegerType, $(labelCol) -> DoubleType)
      .foreach{
        case(columnName: String, expectedDataType: DataType) => {
          val actualDataType = schema(columnName).dataType
          require(actualDataType.equals(IntegerType), s"Column $columnName must be of type $expectedDataType but was actually $actualDataType.")
        }
      }

    schema
  }

  override def transform(dataset: Dataset[_]): DataFrame = {
    transformSchema(dataset.schema)

    val popularItems: mutable.LinkedHashSet[Int] = this.bcPopularItems.value

    val emptyItemSet = new mutable.HashSet[Int]
    val addToItemSet = (itemSet: mutable.HashSet[Int], item: Int) => itemSet += item
    val mergeItemSets = (set1: mutable.HashSet[Int], set2: mutable.HashSet[Int]) => set1 ++= set2

    val getUserNegativeItems = (userItemsPair: (Int, mutable.HashSet[Int])) => {
      val (user, positiveItems) = userItemsPair
      val negativeItems = popularItems.diff(positiveItems)
      val requiredNegativeItemsCount = (positiveItems.size * this.getNegativePositiveRatio).toInt
      (user, negativeItems.slice(0, requiredNegativeItemsCount))
    }
    val expandNegativeItems = (userItemsPair: (Int, mutable.LinkedHashSet[Int])) => {
      val (user, negativeItems) = userItemsPair
      negativeItems.map({(user, _, $(negativeValue))})
    }

    import dataset.sparkSession.implicits._

    // TODO: 目前是假設傳進來的 dataset 都是 positive samples,之後可能得處理含有 negative samples 的情況
    val negativeDF = dataset
      .select($(userCol), $(itemCol))
      .rdd
      .map({
        case Row(user: Int, item: Int) => (user, item)
      })
      .aggregateByKey(emptyItemSet)(addToItemSet, mergeItemSets)
      .map(getUserNegativeItems)
      .flatMap(expandNegativeItems)
      .toDF($(userCol), $(itemCol), $(labelCol))

    dataset.select($(userCol), $(itemCol), $(labelCol)).union(negativeDF)
  }

  override def copy(extra: ParamMap): this.type = {
    defaultCopy(extra)
  }
}

ref:
https://www.safaribooksonline.com/library/view/high-performance-spark/9781491943199/ch09.html#extending_spark_ml
https://stackoverflow.com/questions/40615713/how-to-write-a-custom-transformer-in-mllib
https://issues.apache.org/jira/browse/SPARK-17048

Create a Custom Evaluator

package ws.vinta.albedo.evaluators

import org.apache.spark.ml.evaluation.Evaluator
import org.apache.spark.ml.param.{Param, ParamMap}
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable}
import org.apache.spark.mllib.evaluation.RankingMetrics
import org.apache.spark.sql.{DataFrame, Dataset, Row}

class RankingEvaluator(override val uid: String, val userActualItemsDF: DataFrame)
  extends Evaluator with DefaultParamsWritable {

  def this(userActualItemsDF: DataFrame) = {
    this(Identifiable.randomUID("rankingEvaluator"), userActualItemsDF)
  }

  val metricName = new Param[String](this, "metricName", "評估方式")

  def getMetricName: String = $(metricName)

  def setMetricName(value: String): this.type = set(metricName, value)
  setDefault(metricName -> "[email protected]")

  val k = new Param[Int](this, "k", "只評估前 k 個 items 的排序結果")

  def getK: Int = $(k)

  def setK(value: Int): this.type = set(k, value)
  setDefault(k -> 15)

  override def isLargerBetter: Boolean = $(metricName) match {
    case "map" => true
    case "[email protected]" => true
    case "[email protected]" => true
  }

  override def evaluate(dataset: Dataset[_]): Double = {
    import dataset.sparkSession.implicits._

    val userPredictedItemsDF = dataset.select($"user_id", $"recommendations.repo_id".alias("items"))

    val bothItemsRDD = userPredictedItemsDF.join(userActualItemsDF, Seq("user_id", "user_id"))
      .select(userPredictedItemsDF.col("items"), userActualItemsDF.col("items"))
      .rdd
      .map((row: Row) => {
        // Row(userPredictedItems, userActualItems)
        (row(0).asInstanceOf[Seq[Int]].toArray, row(1).asInstanceOf[Seq[Int]].toArray)
      })

    val rankingMetrics = new RankingMetrics(bothItemsRDD)
    val metric = $(metricName) match {
      case "map" => rankingMetrics.meanAveragePrecision
      case "[email protected]" => rankingMetrics.ndcgAt($(k))
      case "[email protected]" => rankingMetrics.precisionAt($(k))
    }
    metric
  }

  override def copy(extra: ParamMap): RankingEvaluator = {
    defaultCopy(extra)
  }
}

ref:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems
https://www.safaribooksonline.com/library/view/spark-the-definitive/9781491912201/ch19.html#s6c5---recommendation

Apply Transformer on Multiple Columns

import org.apache.spark.ml.feature._

val userCategoricalColumnNames = Array("account_type", "clean_company", "clean_email", "clean_location")
val userCategoricalTransformers = userCategoricalColumnNames.flatMap((columnName: String) => {
  val stringIndexer = new StringIndexer()
    .setInputCol(columnName)
    .setOutputCol(s"${columnName}_index")
    .setHandleInvalid("keep")
  val oneHotEncoder = new OneHotEncoder()
    .setInputCol(s"${columnName}_index")
    .setOutputCol(s"${columnName}_ohe")
    .setDropLast(true)
  Array(stringIndexer, oneHotEncoder)
})
userCategoricalTransformers.foreach(println)
// strIdx_4029f57e379a
// oneHot_f0decb92a05c
// strIdx_fb855ad6caaa
// oneHot_f1be19344002
// strIdx_7fa62a683293
// oneHot_097ae442d8fc
// strIdx_0ff7ffa022a1
// oneHot_4a9f72a7f5d8

ref:
https://stackoverflow.com/questions/34167105/using-spark-mls-onehotencoder-on-multiple-columns

Cross-validate a Pipeline Model

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

val vectorAssembler = new VectorAssembler()
  .setInputCols(Array("feature1", "feature2", "feature3"))
  .setOutputCol("features")

val lr = new LogisticRegression()
  .setFeaturesCol("features")
  .setLabelCol("starring")

val pipeline = new Pipeline()
  .setStages(Array(vectorAssembler, lr))

val paramGrid = new ParamGridBuilder()
  .addGrid(lr.maxIter, Array(20, 100))
  .addGrid(lr.regParam, Array(0.0, 0.5, 1.0, 2.0))
  .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
  .build()

val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setRawPredictionCol("rawPrediction")
  .setLabelCol("starring")

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEstimatorParamMaps(paramGrid)
  .setEvaluator(evaluator)
  .setNumFolds(3)

val cvModel = cv.fit(trainingDF)

ref:
https://spark.apache.org/docs/latest/ml-tuning.html#cross-validation

Extract Best Parameters from a Cross-validation Model

import org.apache.spark.ml.PipelineModel
import org.apache.spark.ml.classification.LogisticRegressionModel

val bestPipelineModel = cvModel.bestModel.asInstanceOf[PipelineModel]
val lrModel = bestPipelineModel.stages(0).asInstanceOf[LogisticRegressionModel]
lrModel.extractParamMap()
// or
lrModel.explainParams()

ref:
https://stackoverflow.com/questions/31749593/how-to-extract-best-parameters-from-a-crossvalidatormodel

Show All Parameters of a Cross-validation Model

import org.apache.spark.ml.param.ParamMap

cvModel.getEstimatorParamMaps
  .zip(cvModel.avgMetrics)
  .sortWith(_._2 > _._2)
  .foreach((pair: (ParamMap, Double)) => {
    println(s"${pair._2}: ${pair._1}")
  })
// 0.8999999999999999: {
//     hashingTF_ac8be8d5806b-numFeatures: 1000,
//     logreg_9f79de6e51ec-regParam: 0.1
// }
// 0.8875: {
//     hashingTF_ac8be8d5806b-numFeatures: 100,
//     logreg_9f79de6e51ec-regParam: 0.1
// }
// 0.875: {
//     hashingTF_ac8be8d5806b-numFeatures: 100,
//     logreg_9f79de6e51ec-regParam: 0.01
// }

ref:
https://stackoverflow.com/questions/31749593/how-to-extract-best-parameters-from-a-crossvalidatormodel
https://alvinalexander.com/scala/how-sort-scala-sequences-seq-list-array-buffer-vector-ordering-ordered

Spark SQL cookbook (Scala)

Spark SQL cookbook (Scala)

Scala is the first class citizen language for interacting with Apache Spark, but it's difficult to learn. This article is mostly about operating DataFrame or Dataset in Spark SQL.

Access SparkSession

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

implicit val spark: SparkSession = SparkSession
  .builder()
  .appName("PersonalizedRanker")
  .config(conf)
  .getOrCreate()

val sc = spark.sparkContext
sc.setLogLevel("WARN")

Before Spark 2.0:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

val conf: SparkConf = new SparkConf()
  .setAppName("PersonalizedRanker")
  .setMaster("local[*]")

val sc: SparkContext = new SparkContext(conf)

ref:
https://sparkour.urizone.net/recipes/understanding-sparksession/
https://spark.apache.org/docs/latest/configuration.html

Import Implicits

val spark = SparkSession.builder().getOrCreate()

import spark.implicits._

// you could also access SparkSession via any Dataset or DataFrame
import someDS.sparkSession.implicits._
import someDF.sparkSession.implicits._

val ratingDF = rawDF
  .selectExpr("from_user_id AS user", "repo_id AS item", "1 AS rating", "starred_at")
  .orderBy($"user", $"starred_at".desc)

Read Data from MySQL

import java.util.Properties

val dbUrl = "jdbc:mysql://mysql:3306/albedo?user=root&password=123&verifyServerCertificate=false&useSSL=false"
val prop = new Properties()
prop.put("driver", "com.mysql.jdbc.Driver")
val rawDF = spark.read.jdbc(dbUrl, "app_repostarring", prop)

ref:
https://docs.databricks.com/spark/latest/data-sources/sql-databases.html

Read Data from a Apache Parquet File

import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}

import spark.implicits._

case class RepoStarring(
  user_id: Int,
  repo_id: Int,
  starred_at: java.sql.Timestamp,
  starring: Double
)

val dataDir = "."
val today = "20170820"

val savePath = s"$dataDir/spark-data/$today/repoStarringDF.parquet"
val df: DataFrame = try {
  spark.read.parquet(savePath)
} catch {
  case e: AnalysisException => {
    if (e.getMessage().contains("Path does not exist")) {
      val df = spark.read.jdbc(dbUrl, "app_repostarring", props)
        .select("user_id", "repo_id", "starred_at")
        .withColumn("starring", lit(1.0))
      df.write.mode("overwrite").parquet(savePath)
      df
    } else {
      throw e
    }
  }
}
df.as[RepoStarring]

Read Data from a Apache Avro File

$ spark-submit --packages "com.databricks:spark-avro_2.11:3.2.0"
import com.databricks.spark.avro._
import org.apache.spark.sql.SparkSession

implicit val spark = SparkSession
  .builder()
  .appName("Word2VecTrainer")
  .getOrCreate()

val df = spark.read.avro("./githubarchive_repo_info.avro")
df.show()

ref:
https://github.com/databricks/spark-avro

Create a RDD from a list of Certain Case Class

import org.apache.spark.rdd.RDD

case class WikipediaArticle(title: String, text: String) {
  def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}

val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
  WikipediaArticle("a1", "abc Scala xyz"),
  WikipediaArticle("a2", "abc Python xyz"),
  WikipediaArticle("a3", "abc Scala xyz"),
  WikipediaArticle("a4", "abc Scala xyz")
))

Get a Subset of a RDD

val subsetArr = someRDD.take(1000)
var subsetRDD = sc.parallelize(subsetArr)

Create a DataFrame from a Seq

import org.apache.spark.sql.functions._

val df = spark.createDataFrame(Seq(
    (1, 1, 1, "2017-05-16 20:01:00.0"),
    (1, 2, 1, "2017-05-17 21:01:00.0"),
    (2, 1, 0, "2017-05-18 22:01:00.0"),
    (3, 1, 1, "2017-05-10 22:01:00.0")
  ))
  .toDF("user", "item", "star", "starred_at")
  .withColumn("starred_at", unix_timestamp(col("starred_at")).cast("timestamp"))
// you could override the schema
// val df = spark.createDataFrame(df.rdd, starringSchema)

case class Starring(user: Int, item: Int, star: Int, starred_at: java.sql.Timestamp)
val ds = df.as[Starring]

df.printSchema()
// root
// |-- user: integer (nullable = false)
// |-- item: integer (nullable = false)
// |-- star: integer (nullable = false)
// |-- starred_at: timestamp (nullable = false)

ref:
https://medium.com/@mrpowers/manually-creating-spark-dataframes-b14dae906393

Create a Dataset

import spark.implicits._

val ds1 = spark.read.json("people.json").as[Person]
val ds2 = df.toDS
val ds3 = rdd.toDS

ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/yrfPh/datasets

Statistics

repoInfoDS.count()
// 16000

repoInfoDS
  .describe("language", "size", "stargazers_count", "forks_count", "subscribers_count", "open_issues_count")
  .show()
// +-------+--------+-----------------+-----------------+----------------+-----------------+-----------------+
// |summary|language|             size| stargazers_count|     forks_count|subscribers_count|open_issues_count|
// +-------+--------+-----------------+-----------------+----------------+-----------------+-----------------+
// |  count|   16000|            16000|            16000|           16000|            16000|            16000|
// |   mean|    null|    14777.8928125|      238.5754375|       61.164375|        20.802375|         9.896625|
// | stddev|    null|133926.8365289536|1206.220336641683|394.950236056925|84.09955924587845|56.72585435688847|
// |    min|   ANTLR|                0|                2|               0|                0|                0|
// |    max|    XSLT|          9427027|            56966|           28338|             4430|             3503|
// +-------+--------+-----------------+-----------------+----------------+-----------------+-----------------+

repoInfoDS.stat.corr("stargazers_count", "forks_count")
// 0.8408082958003276

repoInfoDS.stat.corr("stargazers_count", "size")
// 0.05351197356230549

repoInfoDS.stat.freqItems(Seq("language"), 0.1).show(truncate=false)
// +-----------------------------------------------------------+
// |language_freqItems                                         |
// +-----------------------------------------------------------+
// |[Ruby, Objective-C, C, Go, JavaScript, Swift, Java, Python]|
// +-----------------------------------------------------------+

ref:
https://databricks.com/blog/2015/06/02/statistical-and-mathematical-functions-with-dataframes-in-spark.html

Generate a Schema from a List of Column Names

import org.apache.spark.sql.types._

def generateSchema(columnNames: List[String]): StructType = {
  val fields = columnNames.map({
    case columnName: String if columnName == "tucaseid" => {
      StructField(columnName, StringType, nullable = false)
    }
    case columnName => {
      StructField(columnName, DoubleType, nullable = false)
    }
  })
  StructType(fields.toList)
}

val columnNames = List("tucaseid", "gemetsta", "gtmetsta", "peeduca", "pehspnon", "ptdtrace", "teage")
val schema = generateSchema(columnNames)

Generate a Schema from a Case Class

import java.sql.Timestamp
import org.apache.spark.sql.Encoders

case class Starring(user: Int, item: Int, star: Int, starred_at: Timestamp)
val starringSchema = Encoders.product[Starring].schema

ref:
https://stackoverflow.com/questions/36746055/generate-a-spark-structtype-schema-from-a-case-class

Change a DataFrame's Schema

val newDF = spark.createDataFrame(oldDF.rdd, starringSchema)

Print the Vector Length of Specific Columns

import org.apache.spark.ml.linalg.Vector 

userProfileDF.select("languages_preferences_w2v", "clean_bio_w2v")
  .head
  .toSeq.foreach((field: Any) => {
    println(field.asInstanceOf[Vector].size)
  })

Convert a DataFrame into a Dataset

import java.sql.Timestamp
import org.apache.spark.ml.feature.SQLTransformer

case class Starring(user: Int, item: Int, star: Int, starred_at: Timestamp)

val starringBuilder = new SQLTransformer()
val starringSQL = """
SELECT from_user_id AS user, repo_id AS item, 1 AS star, starred_at
FROM __THIS__
ORDER BY user, starred_at DESC
"""
starringBuilder.setStatement(starringSQL)
val starringDS = starringBuilder.transform(rawDF).as[Starring]

Convert a Dataset into a PairRDD

val pairRDD = dataset
  .select("user", "item")
  .rdd
  .map(row => (row(0), row(1)))

// or

case class UserItemPair(user: Int, item: Int)

val pairRDD = dataset
  .select("user", "item").as[UserItemPair]
  .rdd
  .map(row => (row.user, row.item))

// or

import org.apache.spark.sql.Row

val pairRDD = dataset
  .select("user", "item")
  .rdd
  .map({
    case Row(user: Int, item: Int) => (user, item)
  })

ref:
https://stackoverflow.com/questions/30655914/map-rdd-to-pairrdd-in-scala

Convert a Dataset into a List or Set

val popularRepos = popularReposDS
  .select("item")
  .rdd
  .map(r => r(0).asInstanceOf[Int])
  .collect()
  .to[List]

// or

val popularRepos = popularReposDS
  .select($"repo_id".as[Int])
  .collect()
  .to[List]

// List(98837133, 97071697, 17439026, ...)

ref:
https://stackoverflow.com/questions/32000646/extract-column-values-of-dataframe-as-list-in-apache-spark

Add a Column to a DataFrame

import org.apache.spark.sql.functions.lit

val newDF = df.withColumn("starring", lit(1))

Repeat withColumn()

import org.apache.spark.sql.DataFrame

val booleanColumnNames = Array("fork", "has_issues", "has_projects", "has_downloads", "has_wiki", "has_pages")
val convertedRepoInfoDF = booleanColumnNames.foldLeft[DataFrame](cleanRepoInfoDF)(
  (accDF, columnName) => accDF.withColumn(s"clean_$columnName", col(columnName).cast("int"))
)

// or

val lowerableColumnNames = Array("description", "language", "topics")
val booleanColumnNames = Array("fork", "has_issues", "has_projects", "has_downloads", "has_wiki", "has_pages")
val convertedRepoInfoDF = (lowerableColumnNames ++ booleanColumnNames).foldLeft[DataFrame](cleanRepoInfoDF)((accDF, columnName) => {
  columnName match {
    case _ if lowerableColumnNames.contains(columnName) => 
      accDF.withColumn(s"clean_$columnName", lower(col(columnName)))
    case _ if booleanColumnNames.contains(columnName) => 
      accDF.withColumn(s"clean_$columnName", col(columnName).cast("int"))
  }
})

ref:
https://stackoverflow.com/questions/41400504/spark-scala-repeated-calls-to-withcolumn-using-the-same-function-on-multiple-c

Specify that a Row has any Nullable Column

val nullableColumnNames = Array("bio", "blog", "company", "email", "location", "name")

val df2 = df1.withColumn("has_null", when(nullableColumnNames.map(df1(_).isNull).reduce(_||_), true).otherwise(false))
// or
val df2 = df1.withColumn("has_null", when($"bio".isNull or
                                          $"blog".isNull or
                                          $"company".isNull or
                                          $"email".isNull or
                                          $"location".isNull or
                                          $"name".isNull, true).otherwise(false))

Combine Two Columns into an Array

import org.apache.spark.sql.functions._

fixRepoInfoDS
  .where("topics != ''")
  .withColumn("tags", struct("language", "topics"))
  .select("tags")
  .show(truncate=false)
// +----------------------------------------------------------------------+
// |tags                                                                  |
// +----------------------------------------------------------------------+
// |[Ruby,aasm,activerecord,mongoid,ruby,state-machine,transition]        |
// |[Ruby,captcha,rails,recaptcha,ruby,sinatra]                           |
// |[Python,commandline,gtd,python,sqlite,todo]                           |
// ...
// +----------------------------------------------------------------------+

Select Nested Column Directly

userRecommendationsDF
+----+-----------------------------------------------------------------+
|user|recommendations                                                  |
+----+-----------------------------------------------------------------+
|1   |[[2,0.9997897], [4,0.9988761], [1,0.9988487], [5,0.0]]           |
|3   |[[5,0.9984464], [1,2.9802322E-8], [2,0.0], [4,0.0]]              |
|2   |[[4,0.9921428], [2,0.10759391], [1,0.10749264], [5,1.4901161E-8]]|
+----+-----------------------------------------------------------------+

userRecommendationsDF.printSchema()
// root
// |-- user: integer (nullable = false)
// |-- recommendations: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- item: integer (nullable = true)
// |    |    |-- rating: float (nullable = true)

val userItemsDF = userRecommendationsDF.select($"user", $"recommendations.item")
// +----+------------+
// |user|        item|
// +----+------------+
// |   1|[2, 4, 1, 5]|
// |   3|[5, 1, 2, 4]|
// |   2|[4, 2, 1, 5]|
// +----+------------+

ref:
https://stackoverflow.com/questions/38413040/spark-sql-udf-with-complex-input-parameter

Flatten an Array Column

import org.apache.spark.sql.Row

val userRecommenderItemDF = alsModel.recommendForAllUsers(15)
  .flatMap((row: Row) => {
    val userID = row.getInt(0)
    val recommendations = row.getSeq[Row](1)
    recommendations.map {
      case Row(repoID: Int, score: Float) => {
        (userID, repoID, score, "als")
      }
    }
  })
  .toDF("user_id", "repo_id", "score", "source")

userRecommenderItemDF.show()
// +-------+-------+-------------+------+
// |user_id|repo_id|score        |source|
// +-------+-------+-------------+------+
// |2142   |48     |0.05021151   |als   |
// |2142   |118    |0.05021151   |als   |
// |2142   |68     |0.7791124    |als   |
// |2142   |98     |0.9939307    |als   |
// |2142   |28     |0.53719014   |als   |
// +-------+-------+-------------+------+

Define Conditional Columns

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

val workingStatusProjection: Column = when($"telfs" >= 1 && $"telfs" < 3, "working").
                                      otherwise("not working").
                                      as("working")

val ageProjection: Column = when($"teage" >= 15 && $"teage" <= 22 , "young").
                            when($"teage" >= 23 && $"teage" <= 55 , "active").
                            otherwise("elder").
                            as("age")

val workProjection: Column = workColumns.reduce(_+_).divide(60).as("work_hours")

val new DF = df.select(workingStatusProjection, ageProjection, workProjection)
// +-----------+------+------------------+
// |    working|   age|        work_hours|
// +-----------+------+------------------+
// |    working| elder|               0.0|
// |    working|active|               0.0|
// |    working|active|               0.0|
// |not working|active|               2.0|
// |    working|active| 8.583333333333334|
// +-----------+------+------------------+

Create a Custom Transformation Function for any DataFrame

def withGreeting(df: DataFrame): DataFrame = {
  df.withColumn("greeting", lit("hello world"))
}

def withCat(name: String)(df: DataFrame): DataFrame = {
  df.withColumn("cat", lit(s"$name meow"))
}

val df = Seq(
  "funny",
  "person"
).toDF("something")

val weirdDf = df
  .transform(withGreeting)
  .transform(withCat("kitten"))
// +---------+-----------+-----------+
// |something|   greeting|        cat|
// +---------+-----------+-----------+
// |    funny|hello world|kitten meow|
// |   person|hello world|kitten meow|
// +---------+-----------+-----------+

ref:
https://medium.com/@mrpowers/chaining-custom-dataframe-transformations-in-spark-a39e315f903c

Combine Two Arrays from Two DataFrames

// or
val bothItemsRDD = userPredictedItemsDF.join(userActualItemsDF, Seq("user_id", "user_id"))
  .select(userPredictedItemsDF.col("items"), userActualItemsDF.col("items"))
  .rdd
  .map({
    case Row(userPredictedItems: Seq[Int], userActualItems: Seq[Int]) => {
      (userPredictedItems.slice(0, $(k)).toArray, userActualItems.slice(0, $(k)).toArray)
    }
  })

ref:
https://stackoverflow.com/questions/28166555/how-to-convert-row-of-a-scala-dataframe-into-case-class-most-efficiently

Handle Missing Values (NaN and Null)

NaN stands for "Not a Number", it's usually the result of a mathematical operation that doesn't make sense, e.g. 5/0.

val df1 = repoInfoDF.na.fill("")
val df2 = repoInfoDF.na.fill("", Seq("description", "homepage"))

ref:
https://stackoverflow.com/questions/33089781/spark-dataframe-handing-empty-string-in-onehotencoder
https://stackoverflow.com/questions/43882699/which-differences-there-are-between-null-and-nan-in-spark-how-to-deal-with

Calculate the Difference between Two Dates

import org.apache.spark.sql.functions._

df.withColumn("updated_at_days_since_today", datediff(current_date(), $"updated_at"))

ref:
https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html

Create an User-Defined Function (UDF) which Accepts One Column

import org.apache.spark.sql.functions.udf

val df = Seq(
  (1, "Company Name Inc."),
  (2, "Company Name Co."), 
  (3, "Company Name ")
).toDF("id", "company")

val removeUninformativeWords = (text: String) => {
  text
    .toLowerCase()
    .replaceAll(", inc.", "")
    .replaceAll(" inc.", "")
    .replaceAll("-inc", "")
    .replaceAll(" co., ltd.", "")
    .replaceAll(" co.", "")
    .replaceAll(" ltd.", "")
    .replaceAll(".com", "")
    .replaceAll("@", "")
    .trim()
}
val removeUninformativeWordsUDF = udf(removeUninformativeWords)

val newDF = df.withColumn("fix_company", removeUninformativeWordsUDF($"company"))
newDF.show()

ref:
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-udfs.html

Create an User-Defined Function (UDF) which Accepts Multiple Columns

When defining your UDFs, you must use Row for mapping StructType instead of any custom case classes.

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row

userRecommendationsDF.printSchema()
// root
 // |-- user: integer (nullable = false)
 // |-- recommendations: array (nullable = true)
 // |    |-- element: struct (containsNull = true)
 // |    |    |-- item: integer (nullable = true)
// |    |    |-- rating: float (nullable = true)

val extractItemsUDF = udf((user: Int, recommendations: Seq[Row]) => {
  val items: Seq[Int] = recommendations.map(_.getInt(0))
  val ratings: Seq[Float] = recommendations.map(_.getFloat(1))
  items
})
userRecommendationsDF.withColumn("items", extractItemsUDF($"user", $"recommendations"))
// +----+-----------------------------------------------------------------+------------+
// |user|recommendations                                                  |items       |
// +----+-----------------------------------------------------------------+------------+
// |1   |[[2,0.9997897], [4,0.9988761], [1,0.9988487], [5,0.0]]           |[2, 4, 1, 5]|
// |3   |[[5,0.9984464], [1,2.9802322E-8], [2,0.0], [4,0.0]]              |[5, 1, 2, 4]|
// |2   |[[4,0.9921428], [2,0.10759391], [1,0.10749264], [5,1.4901161E-8]]|[4, 2, 1, 5]|
// +----+-----------------------------------------------------------------+------------+

ref:
https://stackoverflow.com/questions/32196207/derive-multiple-columns-from-a-single-column-in-a-spark-dataframe

The GenericRowWithSchema cannot be cast to XXX issue:

There is a strict mapping between Spark SQL data types and Scala types, such as IntegerType vs. Int, TimestampType vs. java.sql.Timestamp and StructType vs. org.apache.spark.sql.Row.

ref:
https://stackoverflow.com/questions/38413040/spark-sql-udf-with-complex-input-parameter
https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types

Create an User-Defined Function (UDF) with Extra Parameters

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import scala.util.control.Breaks._

val df = spark.createDataFrame(Seq(
  ("Bob", 35, "Web Backend Developer"),
  ("Cathy", 24, "UX Designer"),
  ("Vic", 26, "PM"),
  ("Tom", 29, "Back end Engineer")
)).toDF("name", "age", "occupation")

// the return type of this UDF is Double
def containsAnyOfUDF(substrings: Array[String]): UserDefinedFunction = udf[Double, String]((text: String) => {
  var result = 0.0
  breakable {
    for (substring <- substrings) {
      if (text.contains(substring)) {
        result = 1.0
        break
      }
    }
  }
  result
})

val backends = Array("backend", "back end")
df.withColumn("knows_backend", containsAnyOfUDF(backends)(lower($"occupation"))))
// +-----+---+---------------------+-------------+
// |name |age|occupation           |knows_backend|
// +-----+---+---------------------+-------------+
// |Bob  |35 |Web Backend Developer|1.0          |
// |Cathy|24 |UX Designer          |0.0          |
// |Vic  |26 |PM                   |0.0          |
// |Tom  |29 |Back end Engineer    |1.0          |
// +-----+---+---------------------+-------------+

ref:
https://stackoverflow.com/questions/35546576/how-can-i-pass-extra-parameters-to-udfs-in-sparksql

Select Rows that Contain Certain String in a DataFrame

userInfoDF
  .where("company LIKE '%Inc%'")
  .orderBy($"company".desc)

Order By One of the Elements in a List Column

import org.apache.spark.ml.linalg.{Vector, Vectors}

val to_array = udf((v: Vector) => v.toDense.values)

resultTestDF
  .where("user_id = 652070")
  .orderBy(to_array($"probability").getItem(1).desc)
  .select("user_id", "repo_id", "starring", "prediction", "probability")
  .show(false)

Show Distinct Values

df.select($"company").distinct().orderBy($"company".desc)

import org.apache.spark.sql.functions.approx_count_distinct

imputedUserInfoDF.select(approx_count_distinct($"company")).show()
// 39581

ref:
https://databricks.com/blog/2016/05/19/approximate-algorithms-in-apache-spark-hyperloglog-and-quantiles.html

Group By then Aggregation

import org.apache.spark.sql.functions.count

val df = userInfoDF
  .groupBy($"company")
  .agg(count("user_id").alias("count"))
  .orderBy($"count".desc)
  .limit(1000)

Group By then Get First/Top n Items

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rank

val topN = 10
val window = Window.partitionBy($"user_id").orderBy($"starred_at".desc)
val userLanguagesDF = repoInfoStarringDF
  .withColumn("rank", rank.over(window))
  .where($"rank" <= topN)
  .groupBy($"user_id")
  .agg(collect_list($"language").alias("languages"))
// +--------+-----------------------------------------------------------------+
// |user_id |languages                                                        |
// +--------+-----------------------------------------------------------------+
// |2142    |[JavaScript, Python, Python, Ruby, Go]                           |
// |59990   |[Python, JavaScript, Go, JavaScript, Go]                         |
// |101094  |[JavaScript, C++, Max, C, JavaScript]                            |
// |109622  |[PHP, PHP, PHP, PHP, Python]                                     |
// |201694  |[TypeScript, Python, Vim script, Vim script, Python]             |
// +--------+-----------------------------------------------------------------+

ref:
https://stackoverflow.com/questions/33655467/get-topn-of-all-groups-after-group-by-using-spark-dataframe
https://stackoverflow.com/questions/35918262/spark-topn-after-groupby

or

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number

val df = spark.createDataFrame(Seq(
  (2, 1, 0, "2017-05-18 22:01:00.0"),
  (1, 1, 1, "2017-05-16 20:01:00.0"),
  (2, 5, 0, "2017-05-21 22:01:00.0"),
  (1, 2, 1, "2017-05-17 21:01:00.0"),
  (1, 4, 1, "2017-05-17 22:01:00.0"),
  (1, 7, 1, "2017-05-12 22:01:00.0"),
  (3, 1, 1, "2017-05-19 22:01:00.0"),
  (3, 2, 1, "2017-05-30 22:01:00.0"),
  (3, 5, 1, "2017-05-01 22:01:00.0"),
  (1, 6, 1, "2017-05-19 22:01:00.0")
)).toDF("user_id", "repo_id", "starring", "starred_at")

val windowSpec = Window.partitionBy($"user_id").orderBy($"starred_at".desc)
val userActualItemsDF = df
  .withColumn("row_number", row_number().over(windowSpec))
  .where($"row_number" <= 3)
// +-------+-------+--------+---------------------+----------+
// |user_id|repo_id|starring|starred_at           |row_number|
// +-------+-------+--------+---------------------+----------+
// |1      |6      |1       |2017-05-19 22:01:00.0|1         |
// |1      |4      |1       |2017-05-17 22:01:00.0|2         |
// |1      |2      |1       |2017-05-17 21:01:00.0|3         |
// |3      |2      |1       |2017-05-30 22:01:00.0|1         |
// |3      |1      |1       |2017-05-19 22:01:00.0|2         |
// |3      |5      |1       |2017-05-01 22:01:00.0|3         |
// |2      |5      |0       |2017-05-21 22:01:00.0|1         |
// |2      |1      |0       |2017-05-18 22:01:00.0|2         |
// +-------+-------+--------+---------------------+----------+

ref:
https://stackoverflow.com/questions/33878370/how-to-select-the-first-row-of-each-group
http://xinhstechblog.blogspot.tw/2016/04/spark-window-functions-for-dataframes.html

Group By then Concatenate Strings

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val userTopicsDF = repoInfoStarringDF
  .where($"topics" =!= "")
  .withColumn("rank", rank.over(Window.partitionBy($"user_id").orderBy($"starred_at".desc)))
  .where($"rank" <= 50)
  .groupBy($"user_id")
  .agg(concat_ws(",", collect_list($"topics")).alias("topics_preferences"))
// +--------+-------------------------------------------------------------------+
// | user_id|                                                 topics_preferences|
// +--------+-------------------------------------------------------------------+
// |    2142|go,golang,grpc,microservice,protobuf,consul,go,golang,load-balancer|
// |   59990|api,api-gateway,aws,aws-infrastructure,aws-lambda,deploy-tool      |
// |  101094|ableton,javascript,maxmsp,c,c89,gui,imgui,nuklear                  |
// |  109622|stripe,algolia,magento,magento-algoliasearch,php,search,aws        |
// +--------+-------------------------------------------------------------------+

ref:
https://community.hortonworks.com/questions/44886/dataframe-groupby-and-concat-non-empty-strings.html

Group By after Order By Maintains Order

val list = Seq(
  (2, 1, 0, "2017-05-18 22:01:00.0"),
  (1, 1, 1, "2017-05-16 20:01:00.0"),
  (2, 5, 0, "2017-05-21 22:01:00.0"),
  (1, 2, 1, "2017-05-17 21:01:00.0"),
  (1, 4, 1, "2017-05-17 22:01:00.0"),
  (3, 1, 1, "2017-05-19 22:01:00.0"),
  (3, 2, 1, "2017-05-30 22:01:00.0"),
  (3, 5, 1, "2017-05-01 22:01:00.0"),
  (1, 6, 1, "2017-05-19 22:01:00.0")
)
val df = list.toDF("user_id", "repo_id", "starring", "starred_at")

val userActualItemsDF = df
  .orderBy($"starred_at".desc)
  .groupBy($"user_id")
  .agg(collect_list($"repo_id").alias("items"))
// +-------+------------+
// |user_id|       items|
// +-------+------------+
// |      1|[6, 4, 2, 1]|
// |      3|   [2, 1, 5]|
// |      2|      [5, 1]|
// +-------+------------+

ref:
https://stackoverflow.com/questions/39505599/spark-dataframe-does-groupby-after-orderby-maintain-that-order

collect_list Multiple Columns

import org.apache.spark.sql.functions.{collect_list, struct}

predictionDF
// +----+----+------+----------+
// |user|item|rating|prediction|
// +----+----+------+----------+
// |   1|   1|    12| 0.9988487|
// |   3|   5|     8| 0.9984464|
// |   1|   4|     4|0.99887615|
// |   2|   4|     1| 0.9921428|
// |   1|   2|    90| 0.9997897|
// +----+----+------+----------+

val userRecommendationsDF = predictionDF
  .groupBy($"user")
  .agg(collect_list(struct($"item", $"prediction")).alias("recommendations"))
// +----+----------------------------------------------+
// |user|recommendations                               |
// +----+----------------------------------------------+
// |1   |[[1,0.9988487], [4,0.99887615], [2,0.9997897]]|
// |3   |[[5,0.9984464]]                               |
// |2   |[[4,0.9921428]]                               |
// +----+----------------------------------------------+

groupByKey() on Dataset

val keyValues = List(
  (3, "Me"), (1, "Thi"), (2, "Se"), (3, "ssa"), (1, "sIsA"), (3, "ge:"), (3, "-"), (2, "cre"), (2, "t")
)
val keyValuesDS = keyValues.toDS
val ds = keyValuesDS
  .groupByKey(P => P._1)
  .mapValues(p => p._2)
  .reduceGroups((acc, str) => acc + str)

ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/yrfPh/datasets

case class TimeUsageRow(
  working: String,
  sex: String,
  age: String,
  primaryNeeds: Double,
  work: Double,
  other: Double
)

val summaryDs = doShit()
// +-----------+------+------+------------------+------------------+------------------+
// |    working|   sex|   age|      primaryNeeds|              work|             other|
// +-----------+------+------+------------------+------------------+------------------+
// |    working|  male| elder|             15.25|               0.0|              8.75|
// |    working|female|active|13.833333333333334|               0.0|10.166666666666666|
// |    working|female|active|11.916666666666666|               0.0|12.083333333333334|
// |not working|female|active|13.083333333333334|               2.0| 8.916666666666666|
// |    working|  male|active|11.783333333333333| 8.583333333333334|3.6333333333333333|
// +-----------+------+------+------------------+------------------+------------------+

val finalDs = summaryDs
  .groupByKey((row: TimeUsageRow) => {
    (row.working, row.sex, row.age)
  })
  .agg(
    round(typed.avg[TimeUsageRow](_.primaryNeeds), 1).as(Encoders.DOUBLE),
    round(typed.avg[TimeUsageRow](_.work), 1).as(Encoders.DOUBLE),
    round(typed.avg[TimeUsageRow](_.other), 1).as(Encoders.DOUBLE)
  )
  .map({
    case ((working, sex, age), primaryNeeds, work, other) => {
      TimeUsageRow(working, sex, age, primaryNeeds, work, other)
    }
  })
  .orderBy("working", "sex", "age")
  .as[TimeUsageRow]
// +-----------+------+------+------------+----+-----+
// |    working|   sex|   age|primaryNeeds|work|other|
// +-----------+------+------+------------+----+-----+
// |not working|female|active|        12.4| 0.5| 10.8|
// |not working|female| elder|        10.9| 0.4| 12.4|
// |not working|female| young|        12.5| 0.2| 11.1|
// |not working|  male|active|        11.4| 0.9| 11.4|
// |not working|  male| elder|        10.7| 0.7| 12.3|
// +-----------+------+------+------------+----+-----+

ref:
https://stackoverflow.com/questions/39946210/spark-2-0-datasets-groupbykey-and-divide-operation-and-type-safety

Aggregator

val myAgg = new Aggregator[IN, BUF, OUT] {
  def zero: BUF = ??? // the initial value
  def reduce(b: BUF, a: IN): BUF = ??? // add an element to the running total
  def merge(b1: BUF, b2: BUF): BUF = ??? // merge itermediate values
  def finish(b: BUF): OUT = ??? // return the final result
  override def bufferEncoder: Encoder[BUF] = ???
  override def outputEncoder: Encoder[OUT] = ???
}.toColumn

ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/yrfPh/datasets

Join Two DataFrames

val starringDF = spark.createDataFrame(Seq(
    (652070, 21289110, 1),
    (652070, 4164482, 1),
    (652070, 44838949, 0),
    (1912583, 4164482, 0)
)).toDF("user_id", "repo_id", "starring")

val repoDF = spark.createDataFrame(Seq(
    (21289110, "awesome-python", "Python", 37336),
    (4164482, "django", "Python", 27451),
    (44838949, "swift", "C++", 39840)
)).toDF("id", "repo_name", "repo_language", "stargazers_count")

val fullDF = starringDF.join(repoDF, starringDF.col("repo_id") === repoDF.col("id"))
// +-------+--------+--------+--------+--------------+-------------+----------------+
// |user_id| repo_id|starring|      id|     repo_name|repo_language|stargazers_count|
// +-------+--------+--------+--------+--------------+-------------+----------------+
// | 652070|21289110|       1|21289110|awesome-python|       Python|           37336|
// | 652070| 4164482|       1| 4164482|        django|       Python|           27451|
// | 652070|44838949|       0|44838949|         swift|          C++|           39840|
// |1912583| 4164482|       0| 4164482|        django|       Python|           27451|
// +-------+--------+--------+--------+--------------+-------------+----------------+

ref:
https://docs.databricks.com/spark/latest/faq/join-two-dataframes-duplicated-column.html

Broadcast Join

ref:
https://docs.cloud.databricks.com/docs/latest/databricks_guide/04%20SQL%2C%20DataFrames%20%26%20Datasets/05%20BroadcastHashJoin%20-%20scala.html