Build a recommender system with Spark: Logistic Regression

Build a recommender system with Spark: Logistic Regression

在這個系列的文章裡,我們將使用 Apache Spark、XGBoost、Elasticsearch 和 MySQL 等工具來搭建一個推薦系統的 Machine Learning Pipeline。推薦系統的組成可以粗略地分成 Candidate Generation 和 Ranking 兩個部分,前者是針對用戶產生候選物品集,常用的方法有 Collaborative Filtering、Content-based、標籤配對、熱門排行或人工精選等;後者則是對這些候選物品排序,以 Top N 的方式呈現最終的推薦結果,常用的方法有 Logistic Regression。

在本篇文章中,我們將以 Ranking 階段常用的方法之一:Logistic Regression 邏輯迴歸為例,利用 Apache Spark 的 Logistic Regression 模型建立一個 GitHub repositories 的推薦系統,以用戶對 repo 的打星紀錄和用戶與 repo 的各項屬性做為特徵,預測出用戶會不會打星某個 repo(分類問題)。最後訓練出來的模型就可以做為我們的推薦系統的 Ranking 模組。不過因為 LR 是線性模型,所以通常需要大量的 Feature Engineering 來習得非線性關係。所以這篇文章的重點是 Spark ML 的 Pipeline 機制和特徵工程,不會在演算法的部分著墨太多。

完整的程式碼可以在 https://github.com/vinta/albedo 找到。

系列文章:

Submit the Application

$ spark-submit \
--master spark://192.168.10.100:7077 \
--packages "com.github.fommil.netlib:all:1.1.2,com.hankcs:hanlp:portable-1.3.4,mysql:mysql-connector-java:5.1.41" \
--class ws.vinta.albedo.UserProfileBuilder \
target/albedo-1.0.0-SNAPSHOT.jar

$ spark-submit \
--master spark://192.168.10.100:7077 \
--packages "com.github.fommil.netlib:all:1.1.2,com.hankcs:hanlp:portable-1.3.4,mysql:mysql-connector-java:5.1.41" \
--class ws.vinta.albedo.RepoProfileBuilder \
target/albedo-1.0.0-SNAPSHOT.jar

$ spark-submit \
--master spark://192.168.10.100:7077 \
--packages "com.github.fommil.netlib:all:1.1.2,com.hankcs:hanlp:portable-1.3.4,mysql:mysql-connector-java:5.1.41" \
--class ws.vinta.albedo.LogisticRegressionRanker \
target/albedo-1.0.0-SNAPSHOT.jar

ref:
https://vinta.ws/code/setup-spark-scala-and-maven-with-intellij-idea.html
https://spark.apache.org/docs/latest/submitting-applications.html
https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application

Load Data

我們之前已經利用 GitHub API 和 BigQuery 上的 GitHub Archive 收集了 150 萬筆的打星紀錄和所屬的用戶、repo 數據。目前有以下幾個數據集,大致上是照著 GitHub API 建立的,欄位分別如下:

rawUserInfoDS.printSchema()
// root
 // |-- user_id: integer (nullable = true)
 // |-- user_login: string (nullable = true)
 // |-- user_account_type: string (nullable = true)
 // |-- user_name: string (nullable = true)
 // |-- user_company: string (nullable = true)
 // |-- user_blog: string (nullable = true)
 // |-- user_location: string (nullable = true)
 // |-- user_email: string (nullable = true)
 // |-- user_bio: string (nullable = true)
 // |-- user_public_repos_count: integer (nullable = true)
 // |-- user_public_gists_count: integer (nullable = true)
 // |-- user_followers_count: integer (nullable = true)
 // |-- user_following_count: integer (nullable = true)
 // |-- user_created_at: timestamp (nullable = true)
 // |-- user_updated_at: timestamp (nullable = true)

rawRepoInfoDS.printSchema()
// |-- repo_id: integer (nullable = true)
 // |-- repo_owner_id: integer (nullable = true)
 // |-- repo_owner_username: string (nullable = true)
 // |-- repo_owner_type: string (nullable = true)
 // |-- repo_name: string (nullable = true)
 // |-- repo_full_name: string (nullable = true)
 // |-- repo_description: string (nullable = true)
 // |-- repo_language: string (nullable = true)
 // |-- repo_created_at: timestamp (nullable = true)
 // |-- repo_updated_at: timestamp (nullable = true)
 // |-- repo_pushed_at: timestamp (nullable = true)
 // |-- repo_homepage: string (nullable = true)
 // |-- repo_size: integer (nullable = true)
 // |-- repo_stargazers_count: integer (nullable = true)
 // |-- repo_forks_count: integer (nullable = true)
 // |-- repo_subscribers_count: integer (nullable = true)
 // |-- repo_is_fork: boolean (nullable = true)
 // |-- repo_has_issues: boolean (nullable = true)
 // |-- repo_has_projects: boolean (nullable = true)
 // |-- repo_has_downloads: boolean (nullable = true)
 // |-- repo_has_wiki: boolean (nullable = true)
 // |-- repo_has_pages: boolean (nullable = true)
 // |-- repo_open_issues_count: integer (nullable = true)
 // |-- repo_topics: string (nullable = true)

rawStarringDS.printSchema()
// root
 // |-- user_id: integer (nullable = true)
 // |-- repo_id: integer (nullable = true)
 // |-- starred_at: timestamp (nullable = true)
 // |-- starring: double (nullable = true)

ref:
https://www.githubarchive.org/
http://ghtorrent.org/

載入資料之後,要做的第一件事應該就是 Exploratory Data Analysis (EDA) 了,把玩一下手上的數據。建議大家可以試試 Apache Zeppelin 或是 Databricks 的 Notebook,除了內建 Spark 支援的所有語言之外,也整合了 NoSQL 和 JDBC 支援的資料庫,要畫圖表也很方便,簡直比 Jupyter Notebook 還好用了。

ref:
https://zeppelin.apache.org/
https://databricks.com/

Build User Profile / Item Profile

在這個專案中,最主要的數據主體就是 user 和 repo,所以我們針對兩者各自建立 User Profile 和 Item Profile,作為之後在模型訓練階段會使用的特徵。我們把這個步驟跟模型訓練的流程分開,這樣對整個架構的搭建會更有彈性。實務上,我們可以用 user id 或 item id 當 key,直接把製作好的特徵存進 Redis 或其他 schemaless 的 NoSQL 資料庫,方便之後給多個模型取用;在做 real-time 推薦時,也可以很快地拿到特徵,只需要重新計算部份欄位即可。

不過因為這裡主要用的是來自 GitHub API 的資料,某種程度上人家已經幫我們做了很多資料清理和正規化的動作了,但是在現實中,你的系統要處理的數據通常不會這麼乾淨,可能來自各種 data source、有著各種格式,還會隨著時間而改變,通常得花上不少力氣做 Extract, Transform, Load (ETL),所以最好在寫 log(埋點)的時候就溝通好。而且在 production 環境中,數據是會一直變動的,要確保數據的時效性和容錯性,很重要的一個部分就是 monitoring。

ref:
http://www.algorithmdog.com/ad-rec-deploy
https://tech.meituan.com/online-feature-system02.html

礙於篇幅有限,以下的文章中我們只會挑幾個重要的部分說明。簡單說,在這個步驟的最後,我們會得到 userProfileDFrepoProfileDF 這兩個 DataFrame,分別存放製作好的特徵。詳細的程式碼如下:

ref:
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/UserProfileBuilder.scala
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/RepoProfileBuilder.scala

Feature Engineering

以推薦系統為例,特徵可以分成以下四種:

  • 用戶特徵:用戶本身的各種屬性,例如 user id、性別、職業或所在的城市等
  • 物品特徵:物品本身的各種屬性,例如 item id、作者、標題、分類、評分或所屬的標籤等
  • 交互特徵:用戶對物品做出的某項行為,該行為的 aggregation 或交叉特徵,例如是否看過同類型的電影、最近聽的歌曲的曲風分佈或上週買了多少高單價的商品
  • 上下文特徵:用戶對物品做出的某項行為,該行為的 metadata,例如發生的時間、使用的裝置或當前的 GPS 位置等

有些特徵是在資料採集階段就能拿到,有些特徵則會需要額外的步驟(例如透過外部的 API 或是其他模型)才能取得,也有些特徵必須即時更新。順道一提,因為我們要預測的是「某個用戶會不會打星某個 repo」,所以下述特徵裡的 user 可以是 repo stargazer 也可以是 repo owner。

原始特徵:

  • 用戶特徵
    • user_id
    • user_login
    • user_name
    • user_email
    • user_blog
    • user_bio
    • user_company
    • user_location
    • user_followers_coung
    • user_following_count
    • user_public_repos_count
    • user_public_gists_count
    • user_created_at
    • user_updated_at
  • 物品特徵
    • repo_id
    • repo_name
    • repo_owner
    • repo_owner_type
    • repo_language
    • repo_description
    • repo_homepage
    • repo_subscribers_count
    • repo_stargazers_count
    • repo_forks_count
    • repo_size
    • repo_created_at
    • repo_updated_at
    • repo_pushed_at
    • repo_has_issues
    • repo_has_projects
    • repo_has_downloads
    • repo_has_wiki
    • repo_has_pages
    • repo_open_issues_count
    • repo_topics
  • 交互特徵
    • user_stars_repo
    • user_follows_user
  • 上下文特徵
    • user_repo_starred_at

發想特徵:

  • 用戶特徵
    • user_days_between_created_at_today: 該用戶的註冊日期距離今天過了幾年
    • user_days_between_updated_at_today: 該用戶的更新日期距離今天過了幾天
    • user_repos_avg_stargazers_count: 該用戶名下的所有 repo(不含 fork 的)的平均星星數
    • user_organizations: 該用戶屬於哪些組織
    • user_has_null: 該用戶至少有一個欄位是 null
    • user_has_blog: 該用戶有沒有網站
    • user_is_freelancer: 該用戶的 bio 中是否包含 Freelancer 等字眼
    • user_is_junior: 該用戶的 bio 中是否包含 Beginner 或 Junior 等字眼
    • user_is_lead: 該用戶的 bio 中是否包含 Team Lead、Architect、Creator、CTO 或 VP of Engineering 等字眼
    • user_is_scholar: 該用戶的 bio 中是否包含 Researcher、Scientist、PhD 或 Professor 等字眼
    • user_is_pm: 該用戶的 bio 中是否包含 Product Manager 等字眼
    • user_knows_backend: 該用戶的 bio 中是否包含 Backend 或 Back end 等字眼
    • user_knows_data: 該用戶的 bio 中是否包含 Machine Learning、Deep Learning 或 Data Science 等字眼
    • user_knows_devops: 該用戶的 bio 中是否包含 DevOps、SRE、SysAdmin 或 Infrastructure 等字眼
    • user_knows_frontend: 該用戶的 bio 中是否包含 Frontend 或 Front end 等字眼
    • user_knows_mobile: 該用戶的 bio 中是否包含 Mobile、iOS 或 Android 等字眼
    • user_knows_recsys: 該用戶的 bio 中是否包含 Recommender System、Data Mining 或 Information Retrieval 等字眼
    • user_knows_web: 該用戶的 bio 中是否包含 Web Development 或 Fullstack 等字眼
  • 物品特徵
    • repo_created_at_days_since_today: 該 repo 的建立日期距離今天過了幾天
    • repo_updated_at_days_since_today: 該 repo 的更新日期距離今天過了幾天
    • repo_pushed_at_days_since_today: 該 repo 的提交日期距離今天過了幾天
    • repo_stargazers_count_in_30days: 該 repo 在 30 天內收到的星星數
    • repo_subscribers_stargazers_ratio: 該 repo 的 watch 數和 star 數的比例
    • repo_forks_stargazers_ratio: 該 repo 的 fork 數和 star 數的比例
    • repo_open_issues_stargazers_ratio: 該 repo 的 數和 star 數的比例
    • repo_releases_count: 該 repo 的 release 或 tag 數
    • repo_lisence: 該 repo 的授權條款
    • repo_readme: 該 repo 的 README 內容
    • repo_has_null: 該 repo 有至少一個欄位是 null
    • repo_has_readme: 該 repo 是否有 README 檔案
    • repo_has_changelog: 該 repo 是否有 CHANGELOG 檔案
    • repo_has_contributing: 該 repo 是否有 CONTRIBUTING 檔案
    • repo_has_tests: 該 repo 是否有測試
    • repo_has_ci: 該 repo 是否有 CI
    • repo_has_dockerfile: 該 repo 是否有 Dockerfile
    • repo_is_unmaintained: 該 repo 是否不再維護了
    • repo_is_awesome: 該 repo 是否被收錄進任何的 awesome-xxx 列表裡
    • repo_is_vinta_starred: 該 repo 是否被 @vinta aka 本文的作者打星了
  • 交互特徵
    • user_starred_repos_count: 該用戶總共打星了多少 repo
    • user_avg_daily_starred_repos_count: 該用戶平均每天打星多少 repo
    • user_forked_repos_count: 該用戶總共 fork 了多少 repo
    • user_follower_following_count_ratio: 該用戶的 follower 數和 following 數的比例
    • user_recent_searched_keywords: 該用戶最近搜尋的 50 個關鍵字
    • user_recent_commented_repos: 該用戶最近留言的 50 個 repo
    • user_recent_watched_repos: 該用戶最近訂閱的 50 個 repo
    • user_recent_starred_repos_descriptions: 該用戶最近打星的 50 個 repo 的描述
    • user_recent_starred_repos_languages: 該用戶最近打星的 50 個 repo 的語言
    • user_recent_starred_repos_topics: 該用戶最近打星的 50 個 repo 的標籤
    • user_follows_repo_owner: 該用戶是否追蹤該 repo 的作者
    • repo_language_index_in_user_recent_repo_languages: 該 repo 的語言出現在該用戶最近打星的語言列表的順序
    • repo_language_count_in_user_recent_repo_languages: 該 repo 的語言出現在該用戶最近打星的語言列表的次數
    • repo_topics_user_recent_topics_similarity: 該 repo 的標籤與該用戶最近打星的標籤列表的相似度
  • 上下文特徵
    • als_model_prediction: 來自 ALS 模型的預測值,該用戶對該 repo 的偏好程度
    • gbdt_model_index: 來自 GBDT 模型的 tree index,該 observation 的自動特徵

Feature Engineering 特徵工程中常見的方法
https://vinta.ws/code/feature-engineering.html

Detect Outliers

除了缺失值之外,離群值(異常值)也是需要注意的地方。如果是 continuous 特徵,用 box plot 可以很快地發現離群值;如果是 categorical 特徵,可以 SELECT COUNT(*) ... GROUP BY 一下,然後用 bar chart 查看每個 category 的數量。取決於你所要解決的問題,異常值可能可以直接忽略,也可能需要特別對待,例如搞清楚異常值出現的原因,是資料採集時的差錯或是某種隱含的深層的因素之類的。

ref:
https://www.analyticsvidhya.com/blog/2016/01/guide-data-exploration/
https://www.slideshare.net/tw_dsconf/123-70852901

Impute Missing Values

可以利用 df.describe().show() 查看各個欄位的統計數據:countmeanstddevminmax。除了使用 df.where("some_column IS NULL") 之外,比較不同欄位的 count 差異也可以很快地發現哪些欄位有缺失值。順便觀察一下有缺失值的欄位和 target variable 有沒有什麼關聯。

這裡直接對 nullNaN 數據填充缺失值,因為以下幾個欄位都是字串類型,所以直接改成空字串,方便後續的處理。然後順便做一個 has_null 的特徵。

針對 user:

import org.apache.spark.sql.functions._

val nullableColumnNames = Array("user_name", "user_company", "user_blog", "user_location", "user_bio")

val imputedUserInfoDF = rawUserInfoDS
  .withColumn("user_has_null", when(nullableColumnNames.map(rawUserInfoDS(_).isNull).reduce(_ || _), true).otherwise(false))
  .na.fill("", nullableColumnNames)

針對 repo:

import org.apache.spark.sql.functions._

val nullableColumnNames = Array("repo_description", "repo_homepage")

val imputedRepoInfoDF = rawRepoInfoDS
  .withColumn("repo_has_null", when(nullableColumnNames.map(rawRepoInfoDS(_).isNull).reduce(_ || _), true).otherwise(false))
  .na.fill("", nullableColumnNames)

ref:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameNaFunctions

如果是數值類型的欄位可以考慮使用 Imputer

ref:
https://spark.apache.org/docs/latest/ml-features.html#imputer

Clean Data

針對 user,用 User-defined Function 對幾個文字欄位做一些正規化的處理:

import org.apache.spark.sql.functions._

val cleanUserInfoDF = imputedUserInfoDF
  .withColumn("user_clean_company", cleanCompanyUDF($"user_company"))
  .withColumn("user_clean_location", cleanLocationUDF($"user_location"))
  .withColumn("user_clean_bio", lower($"user_bio"))

針對 repo,過濾掉一些 repo_stargazers_count 太多和太少、description 欄位含有 "unmaintained" 或 "assignment" 等字眼的項目:

val reducedRepoInfo = imputedRepoInfoDF
  .where($"repo_is_fork" === false)
  .where($"repo_forks_count" <= 90000)
  .where($"repo_stargazers_count".between(30, 100000))

val unmaintainedWords = Array("%unmaintained%", "%no longer maintained%", "%deprecated%", "%moved to%")
val assignmentWords = Array("%assignment%", "%作業%", "%作业%")
val demoWords = Array("test", "%demo project%")
val blogWords = Array("my blog")

val cleanRepoInfoDF = reducedRepoInfo
  .withColumn("repo_clean_description", lower($"repo_description"))
  .withColumn("repo_is_unmaintained", when(unmaintainedWords.map($"repo_clean_description".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("repo_is_assignment", when(assignmentWords.map($"repo_clean_description".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("repo_is_demo", when(demoWords.map($"repo_clean_description".like(_)).reduce(_ or _) and $"repo_stargazers_count" <= 40, true).otherwise(false))
  .withColumn("repo_is_blog", when(blogWords.map($"repo_clean_description".like(_)).reduce(_ or _) and $"repo_stargazers_count" <= 40, true).otherwise(false))
  .where($"repo_is_unmaintained" === false)
  .where($"repo_is_assignment" === false)
  .where($"repo_is_demo" === false)
  .where($"repo_is_blog" === false)
  .withColumn("repo_clean_language", lower($"repo_language"))
  .withColumn("repo_clean_topics", lower($"repo_topics"))

Construct Features

針對 user,根據上述的「發想特徵」,製作出新的特徵:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val webThings = Array("web", "fullstack", "full stack")
val backendThings = Array("backend", "back end", "back-end")
val frontendThings = Array("frontend", "front end", "front-end")
val mobileThings = Array("mobile", "ios", "android")
val devopsThings = Array("devops", "sre", "admin", "infrastructure")
val dataThings = Array("machine learning", "deep learning", "data scien", "data analy")
val recsysThings = Array("data mining", "recommend", "information retrieval")

val leadTitles = Array("team lead", "architect", "creator", "director", "cto", "vp of engineering")
val scholarTitles = Array("researcher", "scientist", "phd", "professor")
val freelancerTitles = Array("freelance")
val juniorTitles = Array("junior", "beginner", "newbie")
val pmTitles = Array("product manager")

val userStarredReposCountDF = rawStarringDS
  .groupBy($"user_id")
  .agg(count("*").alias("user_starred_repos_count"))

val starringRepoInfoDF = rawStarringDS
  .select($"user_id", $"repo_id", $"starred_at")
  .join(rawRepoInfoDS, Seq("repo_id"))

val userTopLanguagesDF = starringRepoInfoDF
  .withColumn("rank", rank.over(Window.partitionBy($"user_id").orderBy($"starred_at".desc)))
  .where($"rank" <= 50)
  .groupBy($"user_id")
  .agg(collect_list(lower($"repo_language")).alias("user_recent_repo_languages"))
  .select($"user_id", $"user_recent_repo_languages")

val userTopTopicsDF = starringRepoInfoDF
  .where($"repo_topics" =!= "")
  .withColumn("rank", rank.over(Window.partitionBy($"user_id").orderBy($"starred_at".desc)))
  .where($"rank" <= 50)
  .groupBy($"user_id")
  .agg(concat_ws(",", collect_list(lower($"repo_topics"))).alias("temp_user_recent_repo_topics"))
  .select($"user_id", split($"temp_user_recent_repo_topics", ",").alias("user_recent_repo_topics"))

val userTopDescriptionDF = starringRepoInfoDF
  .where($"repo_description" =!= "")
  .withColumn("rank", rank.over(Window.partitionBy($"user_id").orderBy($"starred_at".desc)))
  .where($"rank" <= 50)
  .groupBy($"user_id")
  .agg(concat_ws(" ", collect_list(lower($"repo_description"))).alias("user_recent_repo_descriptions"))
  .select($"user_id", $"user_recent_repo_descriptions")

val constructedUserInfoDF = cleanUserInfoDF
  .withColumn("user_knows_web", when(webThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_knows_backend", when(backendThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_knows_frontend", when(frontendThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_knows_mobile", when(mobileThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_knows_devops", when(devopsThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_knows_data", when(dataThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_knows_recsys", when(recsysThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_is_lead", when(leadTitles.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_is_scholar", when(scholarTitles.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_is_freelancer", when(freelancerTitles.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_is_junior", when(juniorTitles.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_is_pm", when(pmTitles.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_followers_following_ratio", round($"user_followers_count" / ($"user_following_count" + lit(1.0)), 3))
  .withColumn("user_days_between_created_at_today", datediff(current_date(), $"user_created_at"))
  .withColumn("user_days_between_updated_at_today", datediff(current_date(), $"user_updated_at"))
  .join(userStarredReposCountDF, Seq("user_id"))
  .withColumn("user_avg_daily_starred_repos_count", round($"user_starred_repos_count" / ($"user_days_between_created_at_today" + lit(1.0)), 3))
  .join(userTopDescriptionDF, Seq("user_id"))
  .join(userTopTopicsDF, Seq("user_id"))
  .join(userTopLanguagesDF, Seq("user_id"))

針對 repo,根據上述的「發想特徵」,製作出新的特徵,意思到了就好:

import org.apache.spark.sql.functions._

val vintaStarredRepos = rawStarringDS
  .where($"user_id" === 652070)
  .select($"repo_id".as[Int])
  .collect()
  .to[List]

val constructedRepoInfoDF = cleanRepoInfoDF
  .withColumn("repo_has_activities_in_60days", datediff(current_date(), $"repo_pushed_at") <= 60)
  .withColumn("repo_has_homepage", when($"repo_homepage" === "", false).otherwise(true))
  .withColumn("repo_is_vinta_starred", when($"repo_id".isin(vintaStarredRepos: _*), true).otherwise(false))
  .withColumn("repo_days_between_created_at_today", datediff(current_date(), $"repo_created_at"))
  .withColumn("repo_days_between_updated_at_today", datediff(current_date(), $"repo_updated_at"))
  .withColumn("repo_days_between_pushed_at_today", datediff(current_date(), $"repo_pushed_at"))
  .withColumn("repo_subscribers_stargazers_ratio", round($"repo_subscribers_count" / ($"repo_stargazers_count" + lit(1.0)), 3))
  .withColumn("repo_forks_stargazers_ratio", round($"repo_forks_count" / ($"repo_stargazers_count" + lit(1.0)), 3))
  .withColumn("repo_open_issues_stargazers_ratio", round($"repo_open_issues_count" / ($"repo_stargazers_count" + lit(1.0)), 3))
  .withColumn("repo_text", lower(concat_ws(" ", $"repo_owner_username", $"repo_name", $"repo_language", $"repo_description")))

ref:
https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html

Convert Features

針對 user,這裡主要是對一些 categorical 特徵作 binning:

import org.apache.spark.sql.functions._

val companyCountDF = cleanUserInfoDF
  .groupBy($"user_clean_company")
  .agg(count("*").alias("count_per_user_company"))

val locationCountDF = cleanUserInfoDF
  .groupBy($"user_clean_location")
  .agg(count("*").alias("count_per_user_location"))

val transformedUserInfoDF = constructedUserInfoDF
  .join(companyCountDF, Seq("user_clean_company"))
  .join(locationCountDF, Seq("user_clean_location"))
  .withColumn("user_has_blog", when($"user_blog" === "", 0.0).otherwise(1.0))
  .withColumn("user_binned_company", when($"count_per_user_company" <= 5, "__other").otherwise($"user_clean_company"))
  .withColumn("user_binned_location", when($"count_per_user_location" <= 50, "__other").otherwise($"user_clean_location"))

針對 repo:

import org.apache.spark.sql.functions._

val languagesDF = cleanRepoInfoDF
  .groupBy($"repo_clean_language")
  .agg(count("*").alias("count_per_repo_language"))
  .select($"repo_clean_language", $"count_per_repo_language")
  .cache()

val transformedRepoInfoDF = constructedRepoInfoDF
  .join(languagesDF, Seq("repo_clean_language"))
  .withColumn("repo_binned_language", when($"count_per_repo_language" <= 30, "__other").otherwise($"repo_clean_language"))
  .withColumn("repo_clean_topics", split($"repo_topics", ","))

ref:
https://docs.databricks.com/spark/latest/mllib/binary-classification-mllib-pipelines.html

Prepare the Feature Pipeline

我們過濾掉那些打星了超多 repo 的用戶。從收集到的數據發現,有些用戶甚至打星了一兩萬個 repo,這些用戶可能是個爬蟲專用帳號或是他看到什麼就打星什麼,推薦系統對這樣的用戶來說可能沒什麼意義,還不如從數據集中拿掉。

import org.apache.spark.sql.functions._

val maxStarredReposCount = 2000

val userStarredReposCountDF = rawStarringDS
  .groupBy($"user_id")
  .agg(count("*").alias("user_starred_repos_count"))

val reducedStarringDF = rawStarringDS
  .join(userStarredReposCountDF, Seq("user_id"))
  .where($"user_starred_repos_count" <= maxStarredReposCount)
  .select($"user_id", $"repo_id", $"starred_at", $"starring")

val profileStarringDF = reducedStarringDF
  .join(userProfileDF, Seq("user_id"))
  .join(repoProfileDF, Seq("repo_id"))

Build the Feature Pipeline

把處理特徵的一連串流程寫成 Spark ML Pipeline,方便抽換或是加入新的 Transformer,例如 Standardization、One-hot Encoding 和 Word2Vec,也把 ALS 模型的預測值做為其中一項特徵。

import org.apache.spark.ml.feature._
import org.apache.spark.ml.recommendation.ALSModel
import ws.vinta.albedo.transformers.UserRepoTransformer

val profileStarringDF = reducedStarringDF
  .join(userProfileDF, Seq("user_id"))
  .join(repoProfileDF, Seq("repo_id"))
  .cache()

categoricalColumnNames += "user_id"
categoricalColumnNames += "repo_id"

val userRepoTransformer = new UserRepoTransformer()
  .setInputCols(Array("repo_language", "user_recent_repo_languages"))

continuousColumnNames += "repo_language_index_in_user_recent_repo_languages"
continuousColumnNames += "repo_language_count_in_user_recent_repo_languages"

val alsModelPath = s"${settings.dataDir}/${settings.today}/alsModel.parquet"
val alsModel = ALSModel.load(alsModelPath)
  .setUserCol("user_id")
  .setItemCol("repo_id")
  .setPredictionCol("als_score")
  .setColdStartStrategy("drop")

continuousColumnNames += "als_score"

val categoricalTransformers = categoricalColumnNames.flatMap((columnName: String) => {
  val stringIndexer = new StringIndexer()
    .setInputCol(columnName)
    .setOutputCol(s"${columnName}__idx")
    .setHandleInvalid("keep")

  val oneHotEncoder = new OneHotEncoder()
    .setInputCol(s"${columnName}__idx")
    .setOutputCol(s"${columnName}__ohe")
    .setDropLast(false)

  Array(stringIndexer, oneHotEncoder)
})

val listTransformers = listColumnNames.flatMap((columnName: String) => {
  val countVectorizerModel = new CountVectorizer()
    .setInputCol(columnName)
    .setOutputCol(s"${columnName}__cv")
    .setMinDF(10)
    .setMinTF(1)

  Array(countVectorizerModel)
})

val textTransformers = textColumnNames.flatMap((columnName: String) => {
  val hanLPTokenizer = new HanLPTokenizer()
    .setInputCol(columnName)
    .setOutputCol(s"${columnName}__words")
    .setShouldRemoveStopWords(true)

  val stopWordsRemover = new StopWordsRemover()
    .setInputCol(s"${columnName}__words")
    .setOutputCol(s"${columnName}__filtered_words")
    .setStopWords(StopWordsRemover.loadDefaultStopWords("english"))
  val word2VecModelPath = s"${settings.dataDir}/${settings.today}/word2VecModel.parquet"
  val word2VecModel = Word2VecModel.load(word2VecModelPath)
    .setInputCol(s"${columnName}__filtered_words")
    .setOutputCol(s"${columnName}__w2v")

  Array(hanLPTokenizer, stopWordsRemover, word2VecModel)
})

val finalBooleanColumnNames = booleanColumnNames.toArray
val finalContinuousColumnNames = continuousColumnNames.toArray
val finalCategoricalColumnNames = categoricalColumnNames.map(columnName => s"${columnName}__ohe").toArray
val finalListColumnNames = listColumnNames.map(columnName => s"${columnName}__cv").toArray
val finalTextColumnNames = textColumnNames.map(columnName => s"${columnName}__w2v").toArray
val vectorAssembler = new SimpleVectorAssembler()
  .setInputCols(finalBooleanColumnNames ++ finalContinuousColumnNames ++ finalCategoricalColumnNames ++ finalListColumnNames ++ finalTextColumnNames)
  .setOutputCol("features")

val featureStages = mutable.ArrayBuffer.empty[PipelineStage]
featureStages += userRepoTransformer
featureStages += alsModel
featureStages ++= categoricalTransformers
featureStages ++= listTransformers
featureStages ++= textTransformers
featureStages += vectorAssembler

val featurePipeline = new Pipeline().setStages(featureStages.toArray)
val featurePipelineModel = featurePipeline.fit(profileStarringDF)

ref:
https://spark.apache.org/docs/latest/ml-pipeline.html
https://spark.apache.org/docs/latest/ml-features.html

Handle Imbalanced Data

因為我們要訓練一個 Binary Classification 二元分類模型,會同時需要 positive(正樣本)和 negative(負樣本)。但是我們的原始數據 rawStarringDS 都是正樣本,也就是說我們只有「用戶有對哪些 repo 打星的資料」(正樣本),卻沒有「用戶沒有對哪些 repo 打星的資料」(負樣本)。我們當然是可以用「所有用戶沒有打星的 repo 做為負樣本」,但是考慮到這種做法產生的負樣本的數量實在太大,而且也不太合理,因為那些用戶沒有打星的 repo 不見得是因為他不喜歡,可能只是因為他不知道有那個 repo 存在。

我們後來採用的做法是「用熱門但是用戶沒有打星的 repo 做為負樣本」,我們寫了一個 Spark Transformer 來做這件事:

import ws.vinta.albedo.transformers.NegativeBalancer

import scala.collection.mutable

val sc = spark.sparkContext

val popularReposDS = loadPopularRepoDF()
val popularRepos = popularReposDS
  .select($"repo_id".as[Int])
  .collect()
  .to[mutable.LinkedHashSet]
val bcPopularRepos = sc.broadcast(popularRepos)

val negativeBalancer = new NegativeBalancer(bcPopularRepos)
  .setUserCol("user_id")
  .setItemCol("repo_id")
  .setTimeCol("starred_at")
  .setLabelCol("starring")
  .setNegativeValue(0.0)
  .setNegativePositiveRatio(2.0)
val balancedStarringDF = negativeBalancer.transform(reducedStarringDF)

ref:
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/evaluators/RankingEvaluator.scala
http://www.kdnuggets.com/2017/06/7-techniques-handle-imbalanced-data.html

Split Data

直接使用 holdout 的方式,隨機分配不同的 row 到 training set 和 test set。其他的做法可能是根據時間來拆分,用以前的數據來預測之後的行為。

val profileBalancedStarringDF = balancedStarringDF
  .join(userProfileDF, Seq("user_id"))
  .join(repoProfileDF, Seq("repo_id"))

val tmpDF = featurePipelineModel.transform(profileBalancedStarringDF)
val keepColumnName = tmpDF.columns.filter((columnName: String) => {
  !columnName.endsWith("__idx") &&
  !columnName.endsWith("__ohe") &&
  !columnName.endsWith("__cv") &&
  !columnName.endsWith("__words") &&
  !columnName.endsWith("__filtered_words") &&
  !columnName.endsWith("__w2v")
})
val featuredBalancedStarringDF = tmpDF.select(keepColumnName.map(col): _*)

val Array(trainingFeaturedDF, testFeaturedDF) = featuredBalancedStarringDF.randomSplit(Array(0.9, 0.1))

Build the Model Pipeline

為了方便之後的擴充性,這裡也使用 Spark ML Pipeline 的寫法。Spark ML 的 LogisticRegression 可以額外設置一個 weightCol 來調整不同 row 的權重。

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline, PipelineStage}

import scala.collection.mutable

val weightSQL = """
SELECT *,
       1.0 AS default_weight,
       IF (starring = 1.0, 0.9, 0.1) AS positive_weight,
       IF (starring = 1.0 AND datediff(current_date(), starred_at) <= 365, 0.9, 0.1) AS recent_starred_weight
FROM __THIS__
""".stripMargin
val weightTransformer = new SQLTransformer()
  .setStatement(weightSQL)

val lr = new LogisticRegression()
  .setMaxIter(200)
  .setRegParam(0.7)
  .setElasticNetParam(0.0)
  .setStandardization(true)
  .setLabelCol("starring")
  .setFeaturesCol("standard_features")
  .setWeightCol("recent_starred_weight")

val modelStages = mutable.ArrayBuffer.empty[PipelineStage]
modelStages += weightTransformer
modelStages += lr

val modelPipeline = new Pipeline().setStages(modelStages.toArray)
val modelPipelineModel = modelPipeline.fit(trainingFeaturedDF)

ref:
https://spark.apache.org/docs/latest/ml-classification-regression.html

Evaluate the Model: Classification

因為 Logistic Regression 是二元分類模型,所以我們可以用 Spark ML 的 BinaryClassificationEvaluator 來評估結果。不過因為我們做的是推薦系統,真正在乎的是 Top N 的排序問題,所以這裡的 AUC 的數值參考一下就好。

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

val testRankedDF = modelPipelineModel.transform(testFeaturedDF)

val binaryClassificationEvaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setRawPredictionCol("rawPrediction")
  .setLabelCol("starring")

val classificationMetric = binaryClassificationEvaluator.evaluate(testRankedDF)
println(s"${binaryClassificationEvaluator.getMetricName} = $classificationMetric")
// areaUnderROC = 0.9450631491281277

ref:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
https://docs.databricks.com/spark/latest/mllib/binary-classification-mllib-pipelines.html

Generate Candidates

推薦系統的另外一個重要部分就是產生候選物品集,這裡我們使用以下幾種方式:

  • ALS: 協同過濾的推薦
  • Content-based: 基於內容的推薦
  • Popularity: 基於熱門的推薦

不過因為這篇文章的主題是排序和特徵工程的 Machine Learning Pipeline,所以產生候選物品集的部分就不多說了,有興趣的人可以直接看底下連結的 source code 或是這個系列的其他文章。

import ws.vinta.albedo.recommenders.ALSRecommender
import ws.vinta.albedo.recommenders.ContentRecommender
import ws.vinta.albedo.recommenders.PopularityRecommender

val topK = 30

val alsRecommender = new ALSRecommender()
  .setUserCol("user_id")
  .setItemCol("repo_id")
  .setTopK(topK)

val contentRecommender = new ContentRecommender()
  .setUserCol("user_id")
  .setItemCol("repo_id")
  .setTopK(topK)
  .setEnableEvaluationMode(true)

val popularityRecommender = new PopularityRecommender()
  .setUserCol("user_id")
  .setItemCol("repo_id")
  .setTopK(topK)

val recommenders = mutable.ArrayBuffer.empty[Recommender]
recommenders += alsRecommender
recommenders += contentRecommender
recommenders += popularityRecommender

val candidateDF = recommenders
  .map((recommender: Recommender) => recommender.recommendForUsers(testUserDF))
  .reduce(_ union _)
  .select($"user_id", $"repo_id")
  .distinct()

// 每個 Recommender 的結果類似這樣:
// +-------+-------+----------+------+
// |user_id|repo_id|score     |source|
// +-------+-------+----------+------+
// |652070 |1239728|0.6731846 |als   |
// |652070 |854078 |0.7187486 |als   |
// |652070 |1502338|0.70165294|als   |
// |652070 |1184678|0.7434903 |als   |
// |652070 |547708 |0.7956538 |als   |
// +-------+-------+----------+------+

ref:
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/recommenders/ALSRecommender.scala
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/recommenders/ContentRecommender.scala
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/recommenders/PopularityRecommender.scala

Predict the Ranking

把這些候選物品集丟給我們訓練好的 Logistic Regression 模型來排序。結果中的 probability 欄位的第 0 項表示結果為 0 的機率(negative)、第 1 項表示結果為 1 的機率(positive)。

val profileCandidateDF = candidateDF
  .join(userProfileDF, Seq("user_id"))
  .join(repoProfileDF, Seq("repo_id"))

val featuredCandidateDF = featurePipelineModel
  .transform(profileCandidateDF)

val rankedCandidateDF = modelPipelineModel
  .transform(featuredCandidateDF)

// rankedCandidateDF 的結果類似這樣:
// +-------+--------+----------+----------------------------------------+
// |user_id|repo_id |prediction|probability                             |
// +-------+--------+----------+----------------------------------------+
// |652070 |83467664|1.0       |[0.12711894229094317,0.8728810577090568]|
// |652070 |55099616|1.0       |[0.1422859437320775,0.8577140562679224] |
// |652070 |42266235|1.0       |[0.1462014853157966,0.8537985146842034] |
// |652070 |78012800|1.0       |[0.15576081067098502,0.844239189329015] |
// |652070 |5928761 |1.0       |[0.16149848941925066,0.8385015105807493]|
// +-------+--------+----------+----------------------------------------+

ref:
https://stackoverflow.com/questions/37903288/what-do-colum-rawprediction-and-probability-of-dataframe-mean-in-spark-mllib

Evaluate the Model: Ranking

最後我們使用 Information Retrieval 領域中用來評價排序能力的指標 NDCG (Normalized Discounted Cumulative Gain) 來評估排序的結果。Spark MLlib 有現成的 RankingMetrics 可以用,但是它只適用於 RDD-based 的 API,所以我們改寫成適合 DataFrame-based 的 Evaluator

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import ws.vinta.albedo.evaluators.RankingEvaluator

val userActualItemsDF = reducedStarringDF
  .withColumn("rank", rank().over(Window.partitionBy($"user_id").orderBy($"starred_at".desc)))
  .where($"rank" <= topK)
  .groupBy($"user_id")
  .agg(collect_list($"repo_id").alias("items"))

val userPredictedItemsDF = rankedCandidateDF
  .withColumn("rank", rank().over(Window.partitionBy($"user_id").orderBy(toArrayUDF($"probability").getItem(1).desc)))
  .where($"rank" <= topK)
  .groupBy($"user_id")
  .agg(collect_list($"repo_id").alias("items"))

val rankingEvaluator = new RankingEvaluator(userActualItemsDF)
  .setMetricName("NDCG@k")
  .setK(topK)
  .setUserCol("user_id")
  .setItemsCol("items")
val rankingMetric = rankingEvaluator.evaluate(userPredictedItemsDF)
println(s"${rankingEvaluator.getFormattedMetricName} = $rankingMetric")
// NDCG@30 = 0.021114356461615493

ref:
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/evaluators/RankingEvaluator.scala
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems
https://weekly.codetengu.com/issues/83#kOxuVxW

Spark best practices

Spark best practices

Architecture

A Spark application is a set of processes running on a cluster, all these processes are coordinated by the driver program. The driver program is 1) the process where the main() method of your program runs, 2) the process running the code that creates a SparkSession, RDDs, DataFrames, and stages up or sends off transformations and actions.

Those processes that run computations and store data for your application are executors. Executors are 1) returning computed results to the driver program, 2) provoiding in-memory storage for cached RDDs/DataFrames.

Execution of a Spark program:

  • The driver program runs the Spark application, which creates a SparkSession upon start-up.
  • The SparkSession connects to a cluster manager (e.g., YARN/Mesos) which allocates resources.
  • Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application. Actions are happening on workers.
  • The driver program sends your application code (e.g., functions which applied on map()) to the executors.
  • Transformations and actions are queued up and optimized by the driver program, sent to executors to run, and then the executors send results back to the driver program.

名詞解釋:

  • Driver program:就是 master node,負責建立 SparkSession,你的 Spark app 的 main() 就是跑在這上面
  • Worker node:cluster 裡除了 driver 之外的那些機器,實際執行分散式運算的機器,基本上一台機器就是一個 worker 或 slave
  • Executor:worker node 上的一個個 processes,通常一個 core 對應一個 executor
  • Cluster Manager: 負責管理資源,通常是 YARN。driver program 和 workers 之間會透過 cluster manager 來溝通

ref:
https://spark.apache.org/docs/latest/cluster-overview.html
https://databricks.com/blog/2016/06/22/apache-spark-key-terms-explained.html

Resilient Distributed Dataset (RDD)

RDDs are divided into partitions: each partition can be considered as an immutable subset of the entire RDD. When you execute your Spark program, each partition gets sent to a worker.

對 RDD 的操作可以分為兩種:

  • 所有 return RDD 的操作就是 transformation
    • 會 lazy 地建立新的 RDD,例如 .map().flatMap().filter()
  • 所有不是 return RDD 的就是 action
    • 會 eager 地執行操作,例如 reduce()count().collect()

someRDD.foreach(println) 為例,foreach 是個 action,因為它回傳的是 Unitprintln 實際上是 print 到 executor 的 stdout 了,所以你在 driver program 根本看不到。除非你改成 someRDD.collect().foreach(println)

someRDD.take(10) 為例,take(10) 實際上是在 executors 上被運算出來的,但是會把那 10 筆結果傳回到 driver program 上。

ref:
https://zhangyi.gitbooks.io/spark-in-action/content/chapter2/rdd.html

logRDD.filter(line => line.contains("ERROR")).take(10) 為例,filter() 是 lazy 的好處是,Spark 知道最後只需要 take(10),所以當它 filter 集滿 10 個符合條件的 lines 時就可以不用繼續執行下去了,不需要對整個 dataset 做 filter。

logRDD.map(_.toLowerCase).filter(_.contains("error")).count() 為例,雖然 map()filter() 是兩個操作,但是因為它們都是 lazy evaluation,所以 Spark 能夠在 count() 階段判斷,其實可以在讀取每一行 log 的時候同時做 .toLowerCase.contains("error")

By default, RDDs are recomputed each time you run an action on them! Spark allows you to control what should be cached in memory.

ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/0GZV7/evaluation-in-spark-unlike-scala-collections

Shuffling

Moving data from one node to another across network is called "shuffling". A shuffle can occur when the resulting RDD depends on other elements from the same RDD or another RDD, you can also figure out whether a shuffle has been planned via 1)the return type of certain transformations (e.g. ShuffledRDD); 2) using toDebugString to see its execution plan.

Operatons that might cause a shuffle:

  • cogroup
  • groupWith
  • join
  • leftOuterJoin
  • rightOuterJoin
  • groupByKey
  • reduceByKey
  • combineByKey
  • distinct
  • intersection
  • repartition
  • coalesce

Narrow dependencies:

  • map
  • mapValues
  • flatMap
  • filter
  • mapPartitions
  • mapPartitionsWithIndex

Wide dependencies:

  • cogroup
  • groupWith
  • join
  • leftOuterJoin
  • rightOuterJoin
  • groupByKey
  • reduceByKey
  • combineByKey
  • distinct
  • intersection

ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/bT1YR/shuffling-what-it-is-and-why-its-important

Partitioning

Partitions never span multiple machines, data in the same partition are guaranteed to be on the same machine. The default number of partitions is the total number of cores on all executor nodes.

Customizing partitions is only possible when working with Pair RDD, because of partitioning is done based on keys. The most importing thing is that you must cache() or persist() your RDDs after re-partitioning.

Following operations hold to (and propagate) a partitioner:

  • cogroup
  • groupWith
  • groupByKey
  • reduceByKey
  • foldByKey
  • combineByKey
  • partitionBy
  • join
  • sort
  • mapValues (if parent has a partitioner)
  • flatMapValues (if parent has a partitioner)
  • filter (if parent has a partitioner)

All other operations (e.g. map()) will produce a result without a partitioner.

ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/Vkhm0/partitioning

You can chain a call to .repartition(n) after reading the text file to specify a different and larger number of partitions. You might set this higher to match the number of cores in your cluster, for example.
make num of partitions equal the num of cores in your cluster

ref:
https://www.safaribooksonline.com/library/view/learning-apache-spark/9781785885136/ch01s04.html
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/sparksqlshufflepartitions_draft.html

It's really important to repartition your dataset if you are going to cache it and use for queries. The optimalmal number of partitions is around 4-6 for each executor core, with 40 nodes and 6 executor cores we use 1000 partitions for best performance.

ref:
https://databricks.com/blog/2015/10/13/interactive-audience-analytics-with-apache-spark-and-hyperloglog.html
https://stackoverflow.com/questions/40416357/spark-sql-difference-between-df-repartition-and-dataframewriter-partitionby

Dataset

Dataset 的 transformation API 分成 untyped 和 typed,使用了 untyped API 之後會回傳 DataFrame,會失去 type;typed API 則會回傳 Dataset。如果是像 select() 這樣的 API,只要顯式地加上 type casting,例如 ds.select($"name".as[String], $"age".as[Int]),回傳的東西就會是 Dataset 而不是 DataFrame。

Untyped transformations:

  • 幾乎所有那些 DataFrame 可以用的 transformations,例如 groupBy()

Typed transformations:

  • map()
  • flatMap()
  • reduce()
  • groupByKey()
  • agg()
  • mapGroups()
  • flatMapGroups()
  • reduceGroups()

Datasets don't get all of the optimization the DataFrames get!

盡量使用 Spark SQL 的寫法(即 relational operations),例如 filter($"city".as[String] === "Boston")select($"age".as[Int]) 等,少用 higher-order functions(即 functional operations),例如 filter(p => p.city == "Boston")map() 等,Catalyst 對後者的優化不好。

ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/yrfPh/datasets
https://www.51zero.com/blog/2016/2/24/type-safety-on-spark-dataframes-part-1

Encoders

Encoders are what convert your data between JVM objects and Spark SQL's specilized internal representation. They're required by all Datasets.

Two ways to introduce Encoders:

  • Automaticcly via import spark.implicits._
  • Explicitly via org.apache.spark.sql.Encoder

ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/yrfPh/datasets

Broadcast

某個 DataFrame 小到一台機器可以吃得下,就可以 broadcast 它。

ref:
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/when_to_use_broadcast_variable.html

Spark UI

If your dataset is large, you can try repartitioning to a larger number to allow more parallelism on your job. A good indication of this is if in the Spark UI – you don’t have a lot of tasks, but each task is very slow to complete.

ref:
https://databricks.com/blog/2015/06/22/understanding-your-spark-application-through-visualization.html
https://databricks.com/blog/2016/10/18/7-tips-to-debug-apache-spark-code-faster-with-databricks.html
https://docs.databricks.com/spark/latest/rdd-streaming/debugging-streaming-applications.html

Configurations

ref:
https://spark.apache.org/docs/latest/configuration.html
https://spark.apache.org/docs/latest/tuning.html#memory-tuning

My spark-defaults.conf

spark.driver.maxResultSize       2g
spark.jars.packages              com.github.fommil.netlib:all:1.1.2,com.hankcs:hanlp:portable-1.3.4,mysql:mysql-connector-java:5.1.41,org.apache.httpcomponents:httpclient:4.5.2,org.elasticsearch.client:elasticsearch-rest-high-level-client:5.6.2
spark.kryoserializer.buffer.max  1g
spark.serializer                 org.apache.spark.serializer.KryoSerializer

Allocate Resources (Executors, Cores, and Memory)

When submitting a Spark application via spark-submit, you may specify following options:

  • --driver-memory MEM: Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
  • --executor-memory MEM: Memory per executor (e.g. 1000M, 2G) (Default: 1G).

YARN and Spark standalone only:

  • --executor-cores NUM: Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode)

YARN only:

  • --driver-cores NUM: Number of cores used by the driver, only in cluster mode (Default: 1).
  • --num-executors NUM: Number of executors to launch (Default: 2).

計算方式:

driver: 2cores 7.5GB x 1
worker: 8cores 30GB x 4

  • actual memory for driver = driver memory - (10% overhead x 2)
    • 7.5 - (7.5 x 0.1 x 2) = 6
    • spark.driver.memory=6g
  • cores per executor = 5
    • for good HDFS throughput
    • spark.executor.cores=5
    • --executor-cores 5
  • total cores in cluster = (cores per node - 1) x total nodes
    • leave 1 core per node for YARN daemons
    • (8 - 1) x 4 = 28
  • total executors in cluster = (total cores in cluster / cores per executor) - 1
    • leave 1 executor for ApplicationManager
    • (28 / 5) - 1 = 4
    • spark.executor.instances=4
    • --num-executors 4
  • executors per node = (total executors in cluster + 1) / total nodes
    • (4 + 1) / 4 = 1
  • memory per executor = (memory per node / executors per node) - (10% overhead x 2) - 3
    • (30 / 1) - (30 x 0.1 x 2) - 3 = 21
    • spark.executor.memory=21g

ref:
https://spark.apache.org/docs/latest/submitting-applications.html
https://spark.apache.org/docs/latest/configuration.html
https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application
https://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications/21
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

Build a recommender system with Spark: Content-based and Elasticsearch

Build a recommender system with Spark: Content-based and Elasticsearch

在這個系列的文章裡,我們將使用 Apache Spark、XGBoost、Elasticsearch 和 MySQL 等工具來搭建一個推薦系統的 Machine Learning Pipeline。推薦系統的組成可以粗略地分成 Candidate Generation 和 Ranking 兩個部分,前者是針對用戶產生候選物品集,常用的方法有 Collaborative Filtering、Content-based、標籤配對、熱門排行或人工精選等;後者則是對這些候選物品排序,以 Top N 的方式呈現最終的推薦結果,常用的方法有 Logistic Regression。

在本篇文章中,我們將以 Candidate Generation 階段常用的方法之一:Content-based recommendation 基於內容的推薦為例,利用 Elasticsearch 的 More Like This query 建立一個 GitHub repositories 的推薦系統,以用戶最近打星過的 repo 作為輸入數據,比對出相似的其他 repo 作為候選物品集。

題外話,我原本是打算用 Spark 把 repo 的文本資料轉成 Word2Vec 向量,然後事先計算好各個 repo 之間的相似度(所謂的 Similarity Join),但是要計算這麼多 repo 之間的相似度實在太耗時間和機器了,就算用了 DIMSUM 和 Locality Sensitive Hashing (LSH) 的 Approximate Nearest Neighbor Search 的效果也不是很好。後來一想,尋找相似或相關物品這件事不就是搜尋引擎在做的嗎,所以直接把 repo 的各種資料丟進 Elasticsearch,用 document id 當作搜尋條件,一個 More Like This query 就解決了,爽快。畢竟不需要所有的事情都在 Spark 裡解決嘛。

完整的程式碼可以在 https://github.com/vinta/albedo 找到。

系列文章:

Setup Elasticsearch

為了讓事情簡單一點,我們直接用官方包裝好的 Docker image。另外要注意的是,Elasticsearch 5.x/6.x 跟之前的版本比起來有不小的改動,例如 X-Pack、high-level REST client 和以後每個 index 只能有一個 mapping type 等等,建議大家有空可以翻一下文件。

# in elasticsearch.yml
bootstrap.memory_lock: true
cluster.name: albedo
discovery.type: single-node
http.host: 0.0.0.0
node.name: ${HOSTNAME}
xpack.security.enabled: false
# in docker-compose.yml
version: "3"
services:
  django:
    build: .
    hostname: django
    working_dir: /app
    env_file: .docker-assets/django.env
    command: .docker-assets/django_start.sh
    ports:
      - 8000:8000
    volumes:
      - ".:/app"
      - "../albedo-vendors/bin:/usr/local/bin"
      - "../albedo-vendors/dist-packages:/usr/local/lib/python3.5/dist-packages"
    links:
      - mysql
      - elasticsearch
  mysql:
    image: vinta/mysql:5.7
    hostname: mysql
    env_file: .docker-assets/mysql.env
    command: mysqld --character-set-server=utf8 --collation-server=utf8_unicode_ci
    ports:
      - 3306:3306
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:5.6.2
    ports:
      - 9200:9200
      - 9300:9300
    volumes:
      - "./.docker-assets/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml"
    environment:
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
$ docker-compose up

然後就可以在 http://127.0.0.1:9200/ 存取你的 Elasticsearch cluster 了。

ref:
https://www.elastic.co/guide/en/elasticsearch/reference/5.6/docker.html
https://www.elastic.co/guide/en/elasticsearch/reference/5.6/security-settings.html

Define the Mapping (Data Schema)

這裡用 elasticsearch-dsl-py 定義了一個 index 和 mapping type。

from elasticsearch.helpers import bulk
from elasticsearch_dsl import analyzer
from elasticsearch_dsl import Date, Integer, Keyword, Text, Boolean
from elasticsearch_dsl import Index, DocType
from elasticsearch_dsl.connections import connections

client = connections.create_connection(hosts=['elasticsearch'])

repo_index = Index('repo')
repo_index.settings(
    number_of_shards=1,
    number_of_replicas=0
)

text_analyzer = analyzer(
    'text_analyzer',
    char_filter=["html_strip"],
    tokenizer="standard",
    filter=["asciifolding", "lowercase", "snowball", "stop"]
)
repo_index.analyzer(text_analyzer)

@repo_index.doc_type
class RepoInfoDoc(DocType):
    owner_id = Keyword()
    owner_username = Keyword()
    owner_type = Keyword()
    name = Text(text_analyzer, fields={'raw': Keyword()})
    full_name = Text(text_analyzer, fields={'raw': Keyword()})
    description = Text(text_analyzer)
    language = Keyword()
    created_at = Date()
    updated_at = Date()
    pushed_at = Date()
    homepage = Keyword()
    size = Integer()
    stargazers_count = Integer()
    forks_count = Integer()
    subscribers_count = Integer()
    fork = Boolean()
    has_issues = Boolean()
    has_projects = Boolean()
    has_downloads = Boolean()
    has_wiki = Boolean()
    has_pages = Boolean()
    open_issues_count = Integer()
    topics = Keyword(multi=True)

    class Meta:
        index = repo_index._name

    @classmethod
    def bulk_save(cls, documents):
        dicts = (d.to_dict(include_meta=True) for d in documents)
        return bulk(client, dicts)

    def save(self, **kwargs):
        return super(RepoInfoDoc, self).save(**kwargs)

RepoInfoDoc.init()

Elasticsearch: More than a Search Engine
https://vinta.ws/code/elasticsearch-more-than-a-search-engine.html

ref:
https://github.com/elastic/elasticsearch-dsl-py

Import Data into Elasticsearch

你可以透過很多種手段把存在 MySQL 裡的資料倒進 Elasticsearch,例如 cronjob、Celery 或 MySQL binglog replication,不過因為我們主要的 data models 是用 Django ORM 寫的,這裡就簡單地寫個 Django command 把資料倒進去就好。

from django.core.management.base import BaseCommand

from app.mappings import RepoInfoDoc
from app.models import RepoInfo

class Command(BaseCommand):
    def handle(self, *args, **options):
        def batch_qs(qs, batch_size=500):
            total = qs.count()
            for start in range(0, total, batch_size):
                end = min(start + batch_size, total)
                yield (start, end, total, qs[start:end])

        large_qs = RepoInfo.objects.filter(stargazers_count__gte=10, stargazers_count__lte=290000, fork=False)
        for start, end, total, qs_chunk in batch_qs(large_qs):
            documents = []
            for repo_info in qs_chunk:
                repo_info_doc = RepoInfoDoc()
                repo_info_doc.meta.id = repo_info.id
                repo_info_doc.owner_id = repo_info.owner_id
                repo_info_doc.owner_username = repo_info.owner_username
                repo_info_doc.owner_type = repo_info.owner_type
                repo_info_doc.name = repo_info.name
                repo_info_doc.full_name = repo_info.full_name
                repo_info_doc.description = repo_info.description
                repo_info_doc.language = repo_info.language
                repo_info_doc.created_at = repo_info.created_at
                repo_info_doc.updated_at = repo_info.updated_at
                repo_info_doc.pushed_at = repo_info.pushed_at
                repo_info_doc.homepage = repo_info.homepage
                repo_info_doc.size = repo_info.size
                repo_info_doc.stargazers_count = repo_info.stargazers_count
                repo_info_doc.forks_count = repo_info.forks_count
                repo_info_doc.subscribers_count = repo_info.subscribers_count
                repo_info_doc.fork = repo_info.fork
                repo_info_doc.has_issues = repo_info.has_issues
                repo_info_doc.has_projects = repo_info.has_projects
                repo_info_doc.has_downloads = repo_info.has_downloads
                repo_info_doc.has_wiki = repo_info.has_wiki
                repo_info_doc.has_pages = repo_info.has_pages
                repo_info_doc.open_issues_count = repo_info.open_issues_count
                repo_info_doc.topics = repo_info.topics

                documents.append(repo_info_doc)

            RepoInfoDoc.bulk_save(documents)

noplay/python-mysql-replication
https://github.com/noplay/python-mysql-replication

Find Similar Items

因為之後會在 Spark 裡作為推薦系統的候選物品集的來源之一,我們會把 Elasticsearch 的 More Like This API 封裝成一個 Spark 的 Transformer,所以以下的部分是用 Scala 寫的。

Initialize High-level REST Client

Elasticsearch 5.x 之後官方建議使用 High-level REST Client,用法跟以前 Java 的 TransportClient 稍微有點不同。

import org.apache.http.HttpHost
import org.elasticsearch.client.{RestClient, RestHighLevelClient}

val lowClient = RestClient.builder(new HttpHost("127.0.0.1", 9200, "http")).build()
val highClient = new RestHighLevelClient(lowClient)

ref:
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low-usage-initialization.html
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-getting-started-initialization.html

Perform the More Like This Query

我們會輸入一個 userDF,是一個要產生候選物品集的用戶的 DataFrame,然後會先拿到每個用戶最近打星過的 repo 的列表,repo id 就是 Elasticsearch 的 document id,以此為條件用 More Like This query 找出相似的其他 repo。

val userRecommendedItemDF = userDF
  .flatMap {
    case (userId: Int) => {
      val itemIds = selectUserStarredRepos(userId)

      val lowClient = RestClient.builder(new HttpHost("127.0.0.1", 9200, "http")).build()
      val highClient = new RestHighLevelClient(lowClient)

      val fields = Array("description", "full_name", "language", "topics")
      val texts = Array("")
      val items = itemIds.map((itemId: Int) => new Item("repo", "repo_info_doc", itemId.toString))
      val queryBuilder = moreLikeThisQuery(fields, texts, items)
        .minTermFreq(1)
        .maxQueryTerms(20)

      val searchSourceBuilder = new SearchSourceBuilder()
      searchSourceBuilder.query(queryBuilder)
      searchSourceBuilder.from(0)
      searchSourceBuilder.size($(topK))

      val searchRequest = new SearchRequest()
      searchRequest.indices("repo")
      searchRequest.types("repo_info_doc")
      searchRequest.source(searchSourceBuilder)

      val searchResponse = highClient.search(searchRequest)
      val hits = searchResponse.getHits
      val searchHits = hits.getHits

      val userItemScoreTuples = searchHits.map((searchHit: SearchHit) => {
        val itemId = searchHit.getId.toInt
        val score = searchHit.getScore
        (userId, itemId, score)
      })

      lowClient.close()

      userItemScoreTuples
    }
  }
  .toDF($(userCol), $(itemCol), $(scoreCol))
  .withColumn($(sourceCol), lit(source))

userRecommendedItemDF.show()
// +-------+--------+---------+-------+
// |user_id|repo_id |score    |source |
// +-------+--------+---------+-------+
// |652070 |26152923|44.360096|content|
// |652070 |28451314|38.752697|content|
// |652070 |16175350|35.676353|content|
// |652070 |10885469|30.280012|content|
// |652070 |24037308|28.488512|content|
// +-------+--------+---------+-------+

你可以在 GitHub 找到完整的程式碼
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/ContentRecommenderBuilder.scala

ref:
https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-mlt-query.html
https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-specialized-queries.html

Spark ML cookbook (Scala)

Spark ML cookbook (Scala)

Scala is the first class citizen language for interacting with Apache Spark, but it's difficult to learn. This article is mostly about Spark ML - the new Spark Machine Learning library which was rewritten in DataFrame-based API.

Convert a String Categorical Feature into Numeric One

StringIndexer converts labels (categorical values) into numbers (0.0, 1.0, 2.0 and so on) which ordered by label frequencies, the most frequnet label gets 0. This method is able to handle unseen labels with optional strategies.

StringIndexer's inputCol accepts string, numeric and boolean types.

val df1 = spark.createDataFrame(Seq(
    (1, "Python"),
    (2, "C++"),
    (3, "C++"),
    (4, "JavaScript"),
    (5, "Python"),
    (6, "Python"),
    (7, "Go")
BinaryClassificationEvaluator
)).toDF("repo_id", "repo_language")

val df2 = spark.createDataFrame(Seq(
    (1, "Python"),
    (2, "C++"),
    (3, "C++"),
    (4, "JavaScript"),
    (5, "Python"),
    (6, "Python"),
    (7, "Go"),
    (8, "JavaScript"),
    (9, "Brainfuck"),
    (10, "Brainfuck"),
    (11, "Red")
)).toDF("repo_id", "repo_language")

import org.apache.spark.ml.feature.StringIndexer

val stringIndexer = new StringIndexer()
  .setInputCol("repo_language")
  .setOutputCol("repo_language_index")
  .setHandleInvalid("keep")
val stringIndexerModel = stringIndexer.fit(df1)

stringIndexerModel.labels
// Array[String] = Array(Python, C++, JavaScript, Go)

val indexedDF = stringIndexerModel.transform(df2)
indexedDF.show()
// +-------+-------------+-------------------+
// |repo_id|repo_language|repo_language_index|
// +-------+-------------+-------------------+
// |      1|       Python|                0.0|
// |      2|          C++|                1.0|
// |      3|          C++|                1.0|
// |      4|   JavaScript|                3.0|
// |      5|       Python|                0.0|
// |      6|       Python|                0.0|
// |      7|           Go|                2.0|
// |      8|   JavaScript|                3.0|
// |      9|    Brainfuck|                4.0| <- previously unseen
// |     10|    Brainfuck|                4.0| <- previously unseen
// |     11|          Red|                4.0| <- previously unseen
// +-------+-------------+-------------------+

ref:
https://spark.apache.org/docs/latest/ml-features.html#stringindexer
https://stackoverflow.com/questions/34681534/spark-ml-stringindexer-handling-unseen-labels
https://stackoverflow.com/questions/32277576/how-to-handle-categorical-features-with-spark-ml/32278617

Convert an Indexed Numeric Feature Back to the Original Categorical One

import org.apache.spark.ml.feature.IndexToString

val indexToString = new IndexToString()
  .setInputCol("repo_language_index")
  .setOutputCol("repo_language_ori")

val oriIndexedDF = indexToString.transform(indexedDF)
oriIndexedDF.show()
// +-------+-------------+-------------------+----------------------+
// |repo_id|repo_language|repo_language_index|     repo_language_ori|
// +-------+-------------+-------------------+----------------------+
// |      1|       Python|                0.0|                Python|
// |      2|          C++|                1.0|                   C++|
// |      3|          C++|                1.0|                   C++|
// |      4|   JavaScript|                2.0|            JavaScript|
// |      5|       Python|                0.0|                Python|
// |      6|       Python|                0.0|                Python|
// |      7|           Go|                3.0|                    Go|
// |      8|   JavaScript|                2.0|            JavaScript|
// |      9|    Brainfuck|                4.0|             __unknown| <- previously unseen
// |     10|    Brainfuck|                4.0|             __unknown| <- previously unseen
// |     11|          Red|                4.0|             __unknown| <- previously unseen
// +-------+-------------+-------------------+----------------------+

ref:
https://spark.apache.org/docs/latest/ml-features.html#indextostring

One-hot Encoding for Categorical Features

OneHotEncoder's input column only accepts numeric types. If you have string columns, you need to use StringIndexer to transform them into doubles, bessides, StringIndexer is able to properly deal with unseen values. In my humble opinion, you should always apply StringIndexer before OneHotEncoder.

Be careful that OneHotEncoder's vector length will be the maximun value in the column, you must apply OneHotEncoder on the union dataset of both training set and test set. Since OneHotEncoder does not accept empty string for name, you need to replace all empty strings with a placeholder, something like __empty.

import org.apache.spark.ml.feature.OneHotEncoder

val knownDF = spark.createDataFrame(Seq(
  (2, "b"),
  (3, "c"),
  (0, "x"),
  (6, "c"),
  (4, "a"),
  (1, "a"),
  (5, "a")
)).toDF("category_1", "category_2")

val unseenDF = spark.createDataFrame(Seq(
  (123, "e"),
  (6, "c"),
  (2, "b"),
  (456, "c"),
  (1, "a")
)).toDF("category_1", "category_2")

val knownOneHotDF = new OneHotEncoder()
  .setDropLast(true)
  .setInputCol("category_1")
  .setOutputCol("category_1_one_hot")
  .transform(knownDF)
knownOneHotDF.show()
// +----------+----------+------------------+
// |category_1|category_2|category_1_one_hot|
// +----------+----------+------------------+
// |         2|         b|     (6,[2],[1.0])|
// |         3|         c|     (6,[3],[1.0])|
// |         0|         x|     (6,[0],[1.0])|
// |         6|         c|         (6,[],[])|
// |         4|         a|     (6,[4],[1.0])|
// |         1|         a|     (6,[1],[1.0])|
// |         5|         a|     (6,[5],[1.0])|
// +----------+----------+------------------+

val unseenOneHotDF = new OneHotEncoder()
  .setDropLast(true)
  .setInputCol("category_1")
  .setOutputCol("category_1_one_hot")
  .transform(unseenDF)
unseenOneHotDF.show()
// +----------+----------+------------------+
// |category_1|category_2|category_1_one_hot|
// +----------+----------+------------------+
// |       123|         e| (456,[123],[1.0])|
// |         6|         c|   (456,[6],[1.0])|
// |         2|         b|   (456,[2],[1.0])|
// |       456|         c|       (456,[],[])|
// |         1|         a|   (456,[1],[1.0])|
// +----------+----------+------------------+

ref:
https://spark.apache.org/docs/latest/ml-features.html#onehotencoder
https://stackoverflow.com/questions/32277576/how-to-handle-categorical-features-with-spark-ml/40615508
https://stackoverflow.com/questions/33089781/spark-dataframe-handing-empty-string-in-onehotencoder

Create a Regular Expression Tokenizer

setGaps(true) 時的 pattern 是 match 分隔符;setGaps(false) 時的 pattern 則是 match 字。

import org.apache.spark.ml.feature.RegexTokenizer
import org.apache.spark.sql.functions._

val sentenceDF = spark.createDataFrame(Seq(
  (1, "Hi, I heard about Spark"),
  (2, "I wish Java could use case classes."),
  (3, "Deep,Learning,models,are,state-of-the-art"),
  (4, "fuck_yeah!!! No.")
)).toDF("id", "sentence")

val countTokensUDF = udf((words: Seq[String]) => words.length)

val regexTokenizer = new RegexTokenizer()
  .setInputCol("sentence")
  .setOutputCol("words")
  .setPattern("""[\w\-_]+""").setGaps(false)
  // .setPattern("""\W""").setGaps(true)
  // .setPattern("""[,. ]""").setGaps(true)
val tokenizedDF = regexTokenizer.transform(sentenceDF)

val df = tokenizedDF
  .select("sentence", "words")
  .withColumn("count", countTokensUDF($"words"))
// +-----------------------------------------+-----------------------------------------------+-----+
// |sentence                                 |words                                          |count|
// +-----------------------------------------+-----------------------------------------------+-----+
// |Hi, I heard about Spark                  |[hi, i, heard, about, spark]                   |5    |
// |I wish Java could use case classes.      |[i, wish, java, could, use, case, classes]     |7    |
// |Deep,Learning,models,are,state-of-the-art|[deep, learning, models, are, state-of-the-art]|5    |
// |fuck_yeah!!! No.                         |[fuck_yeah, no]                                |2    |
// +-----------------------------------------+-----------------------------------------------+-----+

ref:
https://spark.apache.org/docs/latest/ml-features.html#tokenizer

Handle Comma-seperated Categorical Column

You could use RegexTokenizer, CountVectorizer or HashingTF.

import org.apache.spark.ml.feature.{RegexTokenizer, CountVectorizer}

val df = spark.createDataFrame(Seq(
  (1, "Action,Sci-Fi"),
  (2, "Sci-Fi,Romance,Horror"),
  (3, "War,Horror")
)).toDF("movie_id", "genres")

val regexTokenizer = new RegexTokenizer()
  .setInputCol("genres")
  .setOutputCol("genres_words")
  .setPattern("""[\w\-_]+""").setGaps(false)
val wordsDF = regexTokenizer.transform(df)

val countVectorizerModel = new CountVectorizer()
  .setInputCol("genres_words")
  .setOutputCol("genres_vector")
  .setMinDF(1) // for whole corpus, delete any term that appears less then n times
  .setMinTF(1) // for each document, delete any term that appears less then n times
  .fit(wordsDF)
val countVectorDF = countModel.transform(wordsDF)

// HashingTF might suffer from potential hash collisions
// it's good to use a power of two
val hashingTF = new HashingTF()
  .setInputCol("genres_words")
  .setOutputCol("genres_htf_vector")
  .setNumFeatures(4)
val htfVectorDF = hashingTF.transform(countVectorDF)

htfVectorDF.show(false)
// +--------+---------------------+-------------------------+-------------------------+-------------------+
// |movie_id|genres               |genres_words             |genres_count_vector      |genres_htf_vector  |
// +--------+---------------------+-------------------------+-------------------------+-------------------+
// |1       |Action,Sci-Fi        |[action, sci-fi]         |(5,[0,3],[1.0,1.0])      |(4,[0],[2.0])      |
// |2       |Sci-Fi,Romance,Horror|[sci-fi, romance, horror]|(5,[0,1,4],[1.0,1.0,1.0])|(4,[0,2],[2.0,1.0])|
// |3       |War,Horror           |[war, horror]            |(5,[1,2],[1.0,1.0])      |(4,[0,2],[1.0,1.0])|
// +--------+---------------------+-------------------------+-------------------------+-------------------+

countModel.vocabulary
// Array(sci-fi, horror, action, romance, war)

ref:
https://spark.apache.org/docs/latest/ml-features.html#countvectorizer
https://spark.apache.org/docs/latest/ml-features.html#tf-idf

Train a Word2Vec Model

The output vector of any Word2Vec model is dense!

import org.apache.spark.ml.feature.Word2Vec

val df = spark.createDataFrame(Seq(
  (1, "Hi I heard about Apache Spark".toLowerCase().split(" ")),
  (2, "I wish Java could use case classes".toLowerCase().split(" ")),
  (3, "Logistic regression models are neat".toLowerCase().split(" ")),
  (4, "Apache Spark with Scala is awesome".toLowerCase().split(" ")),
  (5, Array("中文", "嘛ㄟ通", "but", "必須", "另外", "分詞"))
)).toDF("id", "words")

val word2Vec = new Word2Vec()
  .setInputCol("words")
  .setOutputCol("words_w2v")
  .setMaxIter(10)
  .setVectorSize(3)
  .setWindowSize(5)
  .setMinCount(1)
val word2VecModel = word2Vec.fit(df)

word2VecModel.transform(df)
// +---+------------------------------------------+----------------------------------------------------------+
// |id |words                                     |words_w2v                                                 |
// +---+------------------------------------------+----------------------------------------------------------+
// |1  |[hi, i, heard, about, apache, spark]      |[-0.02013699459393,-0.02995631482274,0.047685102870066956]|
// |2  |[i, wish, java, could, use, case, classes]|[-0.05012317272186,0.01141336891094,-0.03742781743806387] |
// |3  |[logistic, regression, models, are, neat] |[-0.04678827972413,0.032994424477,0.0010566591750830413]  |
// |4  |[apache, spark, with, scala, is, awesome] |[0.0265524153169,0.02056275321716,0.013326843579610188]   |
// |5  |[中文, 嘛ㄟ通, but, 必須, 另外, 分詞]         |[0.0571783996973,-0.02301329133545,0.013507421438892681]  |
// +---+------------------------------------------+----------------------------------------------------------+

val df2 = spark.createDataFrame(Seq(
  (6, Array("not-in-vocabularies", "neither", "no")),
  (7, Array("spark", "not-in-vocabularies")),
  (8, Array("not-in-vocabularies", "spark")),
  (9, Array("no", "not-in-vocabularies", "spark")),
  (10, Array("中文", "spark"))
)).toDF("id", "words")

word2VecModel.transform(df2)
// the order of words doesn't mater
// +---+-------------------------------------+-----------------------------------------------------------------+
// |id |words                                |words_w2v                                                        |
// +---+-------------------------------------+-----------------------------------------------------------------+
// |6  |[not-in-vocabularies, neither, no]   |[0.0,0.0,0.0]                                                    |
// |7  |[spark, hell_no, not-in-vocabularies]|[0.0027440187210838,-0.0529780387878418,0.05730373660723368]     |
// |8  |[hell_no, not-in-vocabularies, spark]|[0.0027440187210838,-0.0529780387878418,0.05730373660723368]     |
// |9  |[not-in-vocabularies, hell_no, spark]|[0.0027440187210838,-0.0529780387878418,0.05730373660723368]     |
// |10 |[no, not-in-vocabularies, spark]     |[0.0027440187210838,-0.0529780387878418,0.05730373660723368]     |
// |11 |[中文, spark]                         |[-0.009499748703092337,-0.018227852880954742,0.13357853144407272]|
// +---+-------------------------------------+-----------------------------------------------------------------+

anotherWord2VecModel.findSynonyms("developer", 5)
// +-----------+------------------+
// |       word|        similarity|
// +-----------+------------------+
// |        dev| 0.881394624710083|
// |development|0.7730562090873718|
// |       oier|0.6866029500961304|
// |  develover|0.6720684766769409|
// |     webdev|0.6582568883895874|
// +-----------+------------------+

ref:
https://spark.apache.org/docs/latest/ml-features.html#word2vec

Calculate the Pearson Correlation between Features

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Matrix
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.Row

val featureNames = Array("stargazers_count", "forks_count", "subscribers_count")
val vectorAssembler = new VectorAssembler()
  .setInputCols(featureNames)
  .setOutputCol("features")

val df = vectorAssembler.transform(rawRepoInfoDS)
val correlationDF = Correlation.corr(df, "features")
val Row(coeff: Matrix) = correlationDF.head

println(featureNames.mkString(", "))
println(coeff.toString)
// stargazers_count, forks_count, subscribers_count
// 1.0                 0.5336901230713282  0.7664204175159971  
// 0.5336901230713282  1.0                 0.5414244966152617  
// 0.7664204175159971  0.5414244966152617  1.0

ref:
https://spark.apache.org/docs/latest/ml-statistics.html

DIMSUM

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

val repoWordRDD = repoVectorDF
  .select($"repo_id", $"text_w2v")
  .rdd
  .flatMap((row: Row) => {
    val repoId = row.getInt(0)
    val vector = row.getAs[DenseVector](1)
    vector.toArray.zipWithIndex.map({
      case (element, index) => MatrixEntry(repoId, index, element)
    })
  })
val repoWordMatrix = new CoordinateMatrix(repoWordRDD)
val wordRepoMatrix = repoWordMatrix.transpose

val repoSimilarityRDD = wordRepoMatrix
  .toRowMatrix
  .columnSimilarities(0.1)
  .entries
  .flatMap({
    case MatrixEntry(row: Long, col: Long, sim: Double) => {
      if (sim >= 0.5) {
        Array((row, col, sim))
      }
      else {
        None
      }
    }
  })
spark.createDataFrame(repoSimilarityRDD).toDF("item_1", "item_2", "similarity")
repoSimilarityDF.show(false)

ref:
https://stackoverflow.com/questions/42455725/columnsimilarities-back-to-spark-data-frame
https://forums.databricks.com/questions/248/when-should-i-use-rowmatrixcolumnsimilarities.html

Train a Locality Sensitive Hashing (LSH) Model: Bucketed Random Projection LSH

To specify the value of bucketLength, if input vectors are normalized, 1-10 times of pow(numRecords, -1/inputDim) would be a reasonable value. For instance, Math.pow(334913.0, -1.0 / 200.0) = 0.9383726472256705.

import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors

val userDF = spark.createDataFrame(Seq(
  (1, Vectors.sparse(6, Seq((0, -4.0), (1, 1.0), (2, 0.2)))),
  (2, Vectors.sparse(6, Seq((0, 5.5), (1, -0.6), (2, 9.0)))),
  (3, Vectors.sparse(6, Seq((1, 1.0), (2, 5.3), (4, 3.0)))),
  (4, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0)))),
  (5, Vectors.sparse(6, Seq((2, 1.0), (5, -0.2)))),
  (6, Vectors.sparse(6, Seq((0, 0.7)))),
  (7, Vectors.sparse(6, Seq((1, 0.3), (2, 1.0))))
)).toDF("user_id", "features")

val repoDF = spark.createDataFrame(Seq(
  (11, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0), (3, 1.0), (4, 1.0), (5, 1.0)))),
  (12, Vectors.sparse(6, Seq((0, 9.0), (1, -2.0), (2, -21.0), (3, 9.0), (4, 1.0), (5, 9.0)))),
  (13, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, -3.0), (3, 3.0), (4, 7.0), (5, 9.0)))),
  (14, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, -3.0)))),
  (15, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0))))
)).toDF("repo_id", "features")

val lsh = new BucketedRandomProjectionLSH()
  .setBucketLength(0.6812920690579612)
  .setNumHashTables(4)
  .setInputCol("features")
  .setOutputCol("hashes")
val lshModel = lsh.fit(repoDF)

val hashedUserDF = lshModel.transform(userDF)
val hashedRepoDF = lshModel.transform(repoDF)
hashedRepoDF.show(false)
// +-------+----------------------------------------------+--------------------------------+
// |repo_id|features                                      |hashes                          |
// +-------+----------------------------------------------+--------------------------------+
// |11     |(6,[0,1,2,3,4,5],[1.0,1.0,1.0,1.0,1.0,1.0])   |[[1.0], [-2.0], [-1.0], [-1.0]] |
// |12     |(6,[0,1,2,3,4,5],[9.0,-2.0,-21.0,9.0,1.0,9.0])|[[21.0], [-28.0], [18.0], [0.0]]|
// |13     |(6,[0,1,2,3,4,5],[1.0,1.0,-3.0,3.0,7.0,9.0])  |[[4.0], [-10.0], [6.0], [-3.0]] |
// |14     |(6,[0,1,2],[1.0,1.0,-3.0])                    |[[2.0], [-3.0], [2.0], [1.0]]   |
// |15     |(6,[1,2],[1.0,1.0])                           |[[-1.0], [0.0], [-2.0], [0.0]]  |
// +-------+----------------------------------------------+--------------------------------+

val similarDF = lshModel
  .approxSimilarityJoin(hashedUserDF, hashedRepoDF, 10.0, "distance")
  .select($"datasetA.user_id".alias("user_id"), $"datasetB.repo_id".alias("repo_id"), $"distance")
  .orderBy($"user_id", $"distance".asc)
similarDF.show(false)
// +-------+-------+------------------+
// |user_id|repo_id|distance          |
// +-------+-------+------------------+
// |1      |15     |4.079215610874228 |
// |3      |15     |5.243090691567332 |
// |4      |15     |1.0               |
// |4      |11     |1.7320508075688772|
// |5      |15     |1.019803902718557 |
// |5      |11     |2.33238075793812  |
// |6      |15     |1.57797338380595  |
// |7      |15     |0.7               |
// |7      |11     |2.118962010041709 |
// +-------+-------+------------------+

val userVector = Vectors.sparse(6, Seq((0, 1.5), (1, 0.8), (2, 2.0)))
val singleSimilarDF = lshModel
  .approxNearestNeighbors(hashedRepoDF, userVector, 5, "distance")
  .select($"repo_id", $"features", $"distance")
singleSimilarDF.show(false)
// +-------+----------------------------------------------+------------------+
// |repo_id|features                                      |distance          |
// +-------+----------------------------------------------+------------------+
// |15     |(6,[1,2],[1.0,1.0])                           |1.8138357147217055|
// |12     |(6,[0,1,2,3,4,5],[9.0,-2.0,-21.0,9.0,1.0,9.0])|27.49709075520536 |
// +-------+----------------------------------------------+------------------+

The problem of approxSimilarityJoin() is that you can't control the number of generated items, the disadvantage of approxNearestNeighbors() is that you have to manually iterate all users to find similar items. Moreover, both methods can easily suffer from the infamous java.lang.OutOfMemoryError.

ref:
https://spark.apache.org/docs/latest/ml-features.html#locality-sensitive-hashing

Train a Locality Sensitive Hashing (LSH) Model: MinHash LSH

MinHash LSH treats input as a binary vector, that is, all non-zero values (include negative values) are just 1. Basically, the Word2Vec vector won't be an appropriate input to MinHash LSH.

import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors

val userDF = spark.createDataFrame(Seq(
  (1, Vectors.sparse(6, Seq((0, -4.0), (1, 1.0), (2, 0.2)))),
  (2, Vectors.sparse(6, Seq((0, 5.5), (1, -0.6), (2, 9.0)))),
  (3, Vectors.sparse(6, Seq((1, 1.0), (2, 5.3), (4, 3.0)))),
  (4, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0)))),
  (5, Vectors.sparse(6, Seq((2, 1.0), (5, -0.2)))),
  (6, Vectors.sparse(6, Seq((2, 0.7)))),
  (7, Vectors.sparse(6, Seq((3, 0.3), (5, 1.0))))
)).toDF("user_id", "features")

val repoDF = spark.createDataFrame(Seq(
  (11, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
  (12, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
  (13, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("repo_id", "features")

val lsh = new MinHashLSH()
  .setNumHashTables(4)
  .setInputCol("features")
  .setOutputCol("hashes")

val lshModel = lsh.fit(userDF)
val hashedUserDF = lshModel.transform(userDF)
val hashedRepoDF = lshModel.transform(repoDF)

hashedUserDF.show(false)
// user 1 and 2 have the same hashed vector
// user 3 and 4 have the same hashed vector
// +-------+--------------------------+-----------------------------------------------------------------------+
// |user_id|features                  |hashes                                                                 |
// +-------+--------------------------+-----------------------------------------------------------------------+
// |1      |(6,[0,1,2],[-4.0,1.0,0.2])|[[-2.031299587E9], [-1.974869772E9], [-1.974047307E9], [4.95314097E8]] |
// |2      |(6,[0,1,2],[5.5,-0.6,9.0])|[[-2.031299587E9], [-1.974869772E9], [-1.974047307E9], [4.95314097E8]] |
// |3      |(6,[1,2,4],[1.0,5.3,3.0]) |[[-2.031299587E9], [-1.974869772E9], [-1.230128022E9], [8.7126731E8]]  |
// |4      |(6,[1,2,4],[1.0,1.0,1.0]) |[[-2.031299587E9], [-1.974869772E9], [-1.230128022E9], [8.7126731E8]]  |
// |5      |(6,[2,5],[1.0,-0.2])      |[[-2.031299587E9], [-1.758749518E9], [-4.86208737E8], [-1.919887134E9]]|
// |6      |(6,[2],[0.7])             |[[-2.031299587E9], [-1.758749518E9], [-4.86208737E8], [1.247220523E9]] |
// |7      |(6,[3,5],[0.3,1.0])       |[[-1.278435698E9], [-1.542629264E9], [2.57710548E8], [-1.919887134E9]] |
// +-------+--------------------------+-----------------------------------------------------------------------+

val userSimilarRepoDF = lshModel
  .approxSimilarityJoin(hashedUserDF, hashedRepoDF, 0.6, "distance")
  .select($"datasetA.user_id".alias("user_id"), $"datasetB.repo_id".alias("repo_id"), $"distance")
  .orderBy($"user_id", $"distance".asc)

userSimilarRepoDF.show(false)
// +-------+-------+-------------------+
// |user_id|repo_id|distance           |
// +-------+-------+-------------------+
// |1      |13     |0.5                |
// |2      |13     |0.5                |
// |3      |13     |0.0                |
// |4      |13     |0.0                |
// |5      |12     |0.33333333333333337|
// |7      |12     |0.33333333333333337|
// |7      |11     |0.33333333333333337|
// +-------+-------+-------------------+

ref:
https://databricks.com/blog/2017/05/09/detecting-abuse-scale-locality-sensitive-hashing-uber-engineering.html

Train a Logistic Regression Model

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.Vectors

val training = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(1.0, 2.5, 0.0, 0.0)),
  (1.0, Vectors.dense(0.1, 9.0, 0.0, 0.0)),
  (1.0, Vectors.dense(0.0, 0.0, 1.0, 0.0)),
  (0.0, Vectors.dense(0.0, 0.0, 2.0, 9.0)),
  (0.0, Vectors.dense(1.0, 0.0, 0.0, 5.0))
)).toDF("label", "features")

val lr = new LogisticRegression()
  .setMaxIter(100)
  .setRegParam(0.0)
  .setElasticNetParam(0.0)
  .setFamily("binomial")
  .setFeaturesCol("features")
  .setLabelCol("label")

lr.explainParams()

val lrModel = lr.fit(training)

println(s"Coefficients: ${lrModel.coefficients}")
// [2.0149015925419,2.694173163503675,9.547978766053463,-5.592221425156231]

println(s"Intercept: ${lrModel.intercept}")
// 8.552229795281482

val result = lrModel.transform(test)

ref:
https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression
https://spark.apache.org/docs/latest/mllib-linear-methods.html#logistic-regression

import org.apache.spark.ml.classification.BinaryLogisticRegressionSummary

val binarySummary = lrModel.summary.asInstanceOf[BinaryLogisticRegressionSummary]
println(s"Area Under ROC: ${binarySummary.areaUnderROC}")

ref:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.classification.BinaryLogisticRegressionTrainingSummary

Evaluate a Binary Classification Model

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.linalg.Vectors

val df = spark.createDataFrame(Seq(
  (Vectors.dense(0.0, 2.5), 1.0), // correct
  (Vectors.dense(1.0, 4.1), 1.0), // correct
  (Vectors.dense(9.2, 1.1), 0.0), // correct
  (Vectors.dense(1.0, 0.1), 0.0), // correct
  (Vectors.dense(5.0, 0.5), 1.0)  // incorrect
)).toDF("rawPrediction", "starring")

val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setRawPredictionCol("rawPrediction")
  .setLabelCol("starring")
val metric = evaluator.evaluate(df)
// 0.8333333333333333

ref:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

Train an ALS Model

import org.apache.spark.ml.recommendation.ALS

val df = spark.createDataFrame(Seq(
  (1, 1, 12),
  (1, 2, 90),
  (1, 4, 4),
  (2, 4, 1),
  (3, 5, 8)
)).toDF("user", "item", "rating")

val als = new ALS()
  .setImplicitPrefs(true)
  .setRank(5)
  .setRegParam(0.5)
  .setAlpha(40)
  .setMaxIter(10)
  .setSeed(42)
  .setColdStartStrategy("drop")
val alsModel = als.fit(df)

val predictionDF = alsModel.transform(df)
// +----+----+------+----------+
// |user|item|rating|prediction|
// +----+----+------+----------+
// |   1|   1|    12| 0.9988487|
// |   3|   5|     8| 0.9984464|
// |   1|   4|     4|0.99887615|
// |   2|   4|     1| 0.9921428|
// |   1|   2|    90| 0.9997897|
// +----+----+------+----------+

predictionDF.printSchema()
// root
 // |-- user: integer (nullable = false)
 // |-- item: integer (nullable = false)
 // |-- rating: integer (nullable = false)
// |-- prediction: float (nullable = false)

val userRecommendationsDF = alsModel.recommendForAllUsers(15)
// +----+-----------------------------------------------------------------+
// |user|recommendations                                                  |
// +----+-----------------------------------------------------------------+
// |1   |[[2,0.9997897], [4,0.9988761], [1,0.9988487], [5,0.0]]           |
// |3   |[[5,0.9984464], [1,2.9802322E-8], [2,0.0], [4,0.0]]              |
// |2   |[[4,0.9921428], [2,0.10759391], [1,0.10749264], [5,1.4901161E-8]]|
// +----+-----------------------------------------------------------------+

userRecommendationsDF.printSchema()
// root
 // |-- user: integer (nullable = false)
 // |-- recommendations: array (nullable = true)
 // |    |-- element: struct (containsNull = true)
 // |    |    |-- item: integer (nullable = true)
// |    |    |-- rating: float (nullable = true)

ref:
https://spark.apache.org/docs/latest/ml-collaborative-filtering.html

Save and Load an ALS Model

import org.apache.hadoop.mapred.InvalidInputException
import org.apache.spark.ml.recommendation.{ALS, ALSModel}

val alsModelSavePath = "./spark-data/20170902/alsModel.parquet"
val alsModel: ALSModel = try {
  ALSModel.load(alsModelSavePath)
} catch {
  case e: InvalidInputException => {
    if (e.getMessage().contains("Input path does not exist")) {
      val als = new ALS()
        .setImplicitPrefs(true)
        .setRank(100)
        .setRegParam(0.5)
        .setAlpha(40)
        .setMaxIter(22)
        .setSeed(42)
        .setColdStartStrategy("drop")
        .setUserCol("user_id")
        .setItemCol("repo_id")
        .setRatingCol("starring")
      val alsModel = als.fit(rawRepoStarringDS)
      alsModel.save(alsModelSavePath)
      alsModel
    } else {
      throw e
    }
  }
}

Create a Custom Transformer

package ws.vinta.albedo.transformers

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.{DoubleParam, Param, ParamMap}
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row}

import scala.collection.mutable

class NegativeBalancer(override val uid: String, val bcPopularItems: Broadcast[mutable.LinkedHashSet[Int]])
  extends Transformer with DefaultParamsWritable {

  def this(bcPopularItems: Broadcast[mutable.LinkedHashSet[Int]]) = {
    this(Identifiable.randomUID("negativeBalancer"), bcPopularItems)
  }

  val userCol = new Param[String](this, "userCol", "User 所在的欄位名稱")

  def getUserCol: String = $(userCol)

  def setUserCol(value: String): this.type = set(userCol, value)
  setDefault(userCol -> "user")

  val itemCol = new Param[String](this, "itemCol", "Item 所在的欄位名稱")

  def getItemCol: String = $(itemCol)

  def setItemCol(value: String): this.type = set(itemCol, value)
  setDefault(itemCol -> "item")

  val labelCol = new Param[String](this, "labelCol", "Label 所在的欄位名稱")

  def getLabelCol: String = $(labelCol)

  def setLabelCol(value: String): this.type = set(labelCol, value)
  setDefault(labelCol -> "label")

  val negativeValue = new DoubleParam(this, "negativeValue", "負樣本的值")

  def getNegativeValue: Double = $(negativeValue)

  def setNegativeValue(value: Double): this.type = set(negativeValue, value)
  setDefault(negativeValue -> 0.0)

  val negativePositiveRatio = new DoubleParam(this, "negativePositiveRatio", "負樣本與正樣本的比例")

  def getNegativePositiveRatio: Double = $(negativePositiveRatio)

  def setNegativePositiveRatio(value: Double): this.type = set(negativePositiveRatio, value)
  setDefault(negativePositiveRatio -> 1.0)

  override def transformSchema(schema: StructType): StructType = {
    Map($(userCol) -> IntegerType, $(itemCol) -> IntegerType, $(labelCol) -> DoubleType)
      .foreach{
        case(columnName: String, expectedDataType: DataType) => {
          val actualDataType = schema(columnName).dataType
          require(actualDataType.equals(IntegerType), s"Column $columnName must be of type $expectedDataType but was actually $actualDataType.")
        }
      }

    schema
  }

  override def transform(dataset: Dataset[_]): DataFrame = {
    transformSchema(dataset.schema)

    val popularItems: mutable.LinkedHashSet[Int] = this.bcPopularItems.value

    val emptyItemSet = new mutable.HashSet[Int]
    val addToItemSet = (itemSet: mutable.HashSet[Int], item: Int) => itemSet += item
    val mergeItemSets = (set1: mutable.HashSet[Int], set2: mutable.HashSet[Int]) => set1 ++= set2

    val getUserNegativeItems = (userItemsPair: (Int, mutable.HashSet[Int])) => {
      val (user, positiveItems) = userItemsPair
      val negativeItems = popularItems.diff(positiveItems)
      val requiredNegativeItemsCount = (positiveItems.size * this.getNegativePositiveRatio).toInt
      (user, negativeItems.slice(0, requiredNegativeItemsCount))
    }
    val expandNegativeItems = (userItemsPair: (Int, mutable.LinkedHashSet[Int])) => {
      val (user, negativeItems) = userItemsPair
      negativeItems.map({(user, _, $(negativeValue))})
    }

    import dataset.sparkSession.implicits._

    // TODO: 目前是假設傳進來的 dataset 都是 positive samples,之後可能得處理含有 negative samples 的情況
    val negativeDF = dataset
      .select($(userCol), $(itemCol))
      .rdd
      .map({
        case Row(user: Int, item: Int) => (user, item)
      })
      .aggregateByKey(emptyItemSet)(addToItemSet, mergeItemSets)
      .map(getUserNegativeItems)
      .flatMap(expandNegativeItems)
      .toDF($(userCol), $(itemCol), $(labelCol))

    dataset.select($(userCol), $(itemCol), $(labelCol)).union(negativeDF)
  }

  override def copy(extra: ParamMap): this.type = {
    defaultCopy(extra)
  }
}

ref:
https://www.safaribooksonline.com/library/view/high-performance-spark/9781491943199/ch09.html#extending_spark_ml
https://stackoverflow.com/questions/40615713/how-to-write-a-custom-transformer-in-mllib
https://issues.apache.org/jira/browse/SPARK-17048

Create a Custom Evaluator

package ws.vinta.albedo.evaluators

import org.apache.spark.ml.evaluation.Evaluator
import org.apache.spark.ml.param.{Param, ParamMap}
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable}
import org.apache.spark.mllib.evaluation.RankingMetrics
import org.apache.spark.sql.{DataFrame, Dataset, Row}

class RankingEvaluator(override val uid: String, val userActualItemsDF: DataFrame)
  extends Evaluator with DefaultParamsWritable {

  def this(userActualItemsDF: DataFrame) = {
    this(Identifiable.randomUID("rankingEvaluator"), userActualItemsDF)
  }

  val metricName = new Param[String](this, "metricName", "評估方式")

  def getMetricName: String = $(metricName)

  def setMetricName(value: String): this.type = set(metricName, value)
  setDefault(metricName -> "ndcg@k")

  val k = new Param[Int](this, "k", "只評估前 k 個 items 的排序結果")

  def getK: Int = $(k)

  def setK(value: Int): this.type = set(k, value)
  setDefault(k -> 15)

  override def isLargerBetter: Boolean = $(metricName) match {
    case "map" => true
    case "ndcg@k" => true
    case "precision@k" => true
  }

  override def evaluate(dataset: Dataset[_]): Double = {
    import dataset.sparkSession.implicits._

    val userPredictedItemsDF = dataset.select($"user_id", $"recommendations.repo_id".alias("items"))

    val bothItemsRDD = userPredictedItemsDF.join(userActualItemsDF, Seq("user_id", "user_id"))
      .select(userPredictedItemsDF.col("items"), userActualItemsDF.col("items"))
      .rdd
      .map((row: Row) => {
        // Row(userPredictedItems, userActualItems)
        (row(0).asInstanceOf[Seq[Int]].toArray, row(1).asInstanceOf[Seq[Int]].toArray)
      })

    val rankingMetrics = new RankingMetrics(bothItemsRDD)
    val metric = $(metricName) match {
      case "map" => rankingMetrics.meanAveragePrecision
      case "ndcg@k" => rankingMetrics.ndcgAt($(k))
      case "precision@k" => rankingMetrics.precisionAt($(k))
    }
    metric
  }

  override def copy(extra: ParamMap): RankingEvaluator = {
    defaultCopy(extra)
  }
}

ref:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems
https://www.safaribooksonline.com/library/view/spark-the-definitive/9781491912201/ch19.html#s6c5---recommendation

Apply Transformer on Multiple Columns

import org.apache.spark.ml.feature._

val userCategoricalColumnNames = Array("account_type", "clean_company", "clean_email", "clean_location")
val userCategoricalTransformers = userCategoricalColumnNames.flatMap((columnName: String) => {
  val stringIndexer = new StringIndexer()
    .setInputCol(columnName)
    .setOutputCol(s"${columnName}_index")
    .setHandleInvalid("keep")
  val oneHotEncoder = new OneHotEncoder()
    .setInputCol(s"${columnName}_index")
    .setOutputCol(s"${columnName}_ohe")
    .setDropLast(true)
  Array(stringIndexer, oneHotEncoder)
})
userCategoricalTransformers.foreach(println)
// strIdx_4029f57e379a
// oneHot_f0decb92a05c
// strIdx_fb855ad6caaa
// oneHot_f1be19344002
// strIdx_7fa62a683293
// oneHot_097ae442d8fc
// strIdx_0ff7ffa022a1
// oneHot_4a9f72a7f5d8

ref:
https://stackoverflow.com/questions/34167105/using-spark-mls-onehotencoder-on-multiple-columns

Cross-validate a Pipeline Model

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

val vectorAssembler = new VectorAssembler()
  .setInputCols(Array("feature1", "feature2", "feature3"))
  .setOutputCol("features")

val lr = new LogisticRegression()
  .setFeaturesCol("features")
  .setLabelCol("starring")

val pipeline = new Pipeline()
  .setStages(Array(vectorAssembler, lr))

val paramGrid = new ParamGridBuilder()
  .addGrid(lr.maxIter, Array(20, 100))
  .addGrid(lr.regParam, Array(0.0, 0.5, 1.0, 2.0))
  .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
  .build()

val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setRawPredictionCol("rawPrediction")
  .setLabelCol("starring")

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEstimatorParamMaps(paramGrid)
  .setEvaluator(evaluator)
  .setNumFolds(3)

val cvModel = cv.fit(trainingDF)

ref:
https://spark.apache.org/docs/latest/ml-tuning.html#cross-validation

Extract Best Parameters from a Cross-validation Model

import org.apache.spark.ml.PipelineModel
import org.apache.spark.ml.classification.LogisticRegressionModel

val bestPipelineModel = cvModel.bestModel.asInstanceOf[PipelineModel]
val lrModel = bestPipelineModel.stages(0).asInstanceOf[LogisticRegressionModel]
lrModel.extractParamMap()
// or
lrModel.explainParams()

ref:
https://stackoverflow.com/questions/31749593/how-to-extract-best-parameters-from-a-crossvalidatormodel

Show All Parameters of a Cross-validation Model

import org.apache.spark.ml.param.ParamMap

cvModel.getEstimatorParamMaps
  .zip(cvModel.avgMetrics)
  .sortWith(_._2 > _._2)
  .foreach((pair: (ParamMap, Double)) => {
    println(s"${pair._2}: ${pair._1}")
  })
// 0.8999999999999999: {
//     hashingTF_ac8be8d5806b-numFeatures: 1000,
//     logreg_9f79de6e51ec-regParam: 0.1
// }
// 0.8875: {
//     hashingTF_ac8be8d5806b-numFeatures: 100,
//     logreg_9f79de6e51ec-regParam: 0.1
// }
// 0.875: {
//     hashingTF_ac8be8d5806b-numFeatures: 100,
//     logreg_9f79de6e51ec-regParam: 0.01
// }

ref:
https://stackoverflow.com/questions/31749593/how-to-extract-best-parameters-from-a-crossvalidatormodel
https://alvinalexander.com/scala/how-sort-scala-sequences-seq-list-array-buffer-vector-ordering-ordered

Spark SQL cookbook (Scala)

Spark SQL cookbook (Scala)

Scala is the first class citizen language for interacting with Apache Spark, but it's difficult to learn. This article is mostly about operating DataFrame or Dataset in Spark SQL.

Access SparkSession

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

implicit val spark: SparkSession = SparkSession
  .builder()
  .appName("PersonalizedRanker")
  .config(conf)
  .getOrCreate()

val sc = spark.sparkContext
sc.setLogLevel("WARN")

Before Spark 2.0:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

val conf: SparkConf = new SparkConf()
  .setAppName("PersonalizedRanker")
  .setMaster("local[*]")

val sc: SparkContext = new SparkContext(conf)

ref:
https://sparkour.urizone.net/recipes/understanding-sparksession/
https://spark.apache.org/docs/latest/configuration.html

Import Implicits

val spark = SparkSession.builder().getOrCreate()

import spark.implicits._

// you could also access SparkSession via any Dataset or DataFrame
import someDS.sparkSession.implicits._
import someDF.sparkSession.implicits._

val ratingDF = rawDF
  .selectExpr("from_user_id AS user", "repo_id AS item", "1 AS rating", "starred_at")
  .orderBy($"user", $"starred_at".desc)

Read Data from MySQL

import java.util.Properties

val dbUrl = "jdbc:mysql://mysql:3306/albedo?user=root&password=123&verifyServerCertificate=false&useSSL=false"
val prop = new Properties()
prop.put("driver", "com.mysql.jdbc.Driver")
val rawDF = spark.read.jdbc(dbUrl, "app_repostarring", prop)

ref:
https://docs.databricks.com/spark/latest/data-sources/sql-databases.html

Read Data from a Apache Parquet File

import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}

import spark.implicits._

case class RepoStarring(
  user_id: Int,
  repo_id: Int,
  starred_at: java.sql.Timestamp,
  starring: Double
)

val dataDir = "."
val today = "20170820"

val savePath = s"$dataDir/spark-data/$today/repoStarringDF.parquet"
val df: DataFrame = try {
  spark.read.parquet(savePath)
} catch {
  case e: AnalysisException => {
    if (e.getMessage().contains("Path does not exist")) {
      val df = spark.read.jdbc(dbUrl, "app_repostarring", props)
        .select("user_id", "repo_id", "starred_at")
        .withColumn("starring", lit(1.0))
      df.write.mode("overwrite").parquet(savePath)
      df
    } else {
      throw e
    }
  }
}
df.as[RepoStarring]

Read Data from a Apache Avro File

$ spark-submit --packages "com.databricks:spark-avro_2.11:3.2.0"
import com.databricks.spark.avro._
import org.apache.spark.sql.SparkSession

implicit val spark = SparkSession
  .builder()
  .appName("Word2VecTrainer")
  .getOrCreate()

val df = spark.read.avro("./githubarchive_repo_info.avro")
df.show()

ref:
https://github.com/databricks/spark-avro

Create a RDD from a list of Certain Case Class

import org.apache.spark.rdd.RDD

case class WikipediaArticle(title: String, text: String) {
  def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}

val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
  WikipediaArticle("a1", "abc Scala xyz"),
  WikipediaArticle("a2", "abc Python xyz"),
  WikipediaArticle("a3", "abc Scala xyz"),
  WikipediaArticle("a4", "abc Scala xyz")
))

Get a Subset of a RDD

val subsetArr = someRDD.take(1000)
var subsetRDD = sc.parallelize(subsetArr)

Create a DataFrame from a Seq

import org.apache.spark.sql.functions._

val df = spark.createDataFrame(Seq(
    (1, 1, 1, "2017-05-16 20:01:00.0"),
    (1, 2, 1, "2017-05-17 21:01:00.0"),
    (2, 1, 0, "2017-05-18 22:01:00.0"),
    (3, 1, 1, "2017-05-10 22:01:00.0")
  ))
  .toDF("user", "item", "star", "starred_at")
  .withColumn("starred_at", unix_timestamp(col("starred_at")).cast("timestamp"))
// you could override the schema
// val df = spark.createDataFrame(df.rdd, starringSchema)

case class Starring(user: Int, item: Int, star: Int, starred_at: java.sql.Timestamp)
val ds = df.as[Starring]

df.printSchema()
// root
// |-- user: integer (nullable = false)
// |-- item: integer (nullable = false)
// |-- star: integer (nullable = false)
// |-- starred_at: timestamp (nullable = false)

ref:
https://medium.com/@mrpowers/manually-creating-spark-dataframes-b14dae906393

Create a Dataset

import spark.implicits._

val ds1 = spark.read.json("people.json").as[Person]
val ds2 = df.toDS
val ds3 = rdd.toDS

ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/yrfPh/datasets

Statistics

repoInfoDS.count()
// 16000

repoInfoDS
  .describe("language", "size", "stargazers_count", "forks_count", "subscribers_count", "open_issues_count")
  .show()
// +-------+--------+-----------------+-----------------+----------------+-----------------+-----------------+
// |summary|language|             size| stargazers_count|     forks_count|subscribers_count|open_issues_count|
// +-------+--------+-----------------+-----------------+----------------+-----------------+-----------------+
// |  count|   16000|            16000|            16000|           16000|            16000|            16000|
// |   mean|    null|    14777.8928125|      238.5754375|       61.164375|        20.802375|         9.896625|
// | stddev|    null|133926.8365289536|1206.220336641683|394.950236056925|84.09955924587845|56.72585435688847|
// |    min|   ANTLR|                0|                2|               0|                0|                0|
// |    max|    XSLT|          9427027|            56966|           28338|             4430|             3503|
// +-------+--------+-----------------+-----------------+----------------+-----------------+-----------------+

repoInfoDS.stat.corr("stargazers_count", "forks_count")
// 0.8408082958003276

repoInfoDS.stat.corr("stargazers_count", "size")
// 0.05351197356230549

repoInfoDS.stat.freqItems(Seq("language"), 0.1).show(truncate=false)
// +-----------------------------------------------------------+
// |language_freqItems                                         |
// +-----------------------------------------------------------+
// |[Ruby, Objective-C, C, Go, JavaScript, Swift, Java, Python]|
// +-----------------------------------------------------------+

ref:
https://databricks.com/blog/2015/06/02/statistical-and-mathematical-functions-with-dataframes-in-spark.html

Generate a Schema from a List of Column Names

import org.apache.spark.sql.types._

def generateSchema(columnNames: List[String]): StructType = {
  val fields = columnNames.map({
    case columnName: String if columnName == "tucaseid" => {
      StructField(columnName, StringType, nullable = false)
    }
    case columnName => {
      StructField(columnName, DoubleType, nullable = false)
    }
  })
  StructType(fields.toList)
}

val columnNames = List("tucaseid", "gemetsta", "gtmetsta", "peeduca", "pehspnon", "ptdtrace", "teage")
val schema = generateSchema(columnNames)

Generate a Schema from a Case Class

import java.sql.Timestamp
import org.apache.spark.sql.Encoders

case class Starring(user: Int, item: Int, star: Int, starred_at: Timestamp)
val starringSchema = Encoders.product[Starring].schema

ref:
https://stackoverflow.com/questions/36746055/generate-a-spark-structtype-schema-from-a-case-class

Change a DataFrame's Schema

val newDF = spark.createDataFrame(oldDF.rdd, starringSchema)

Print the Vector Length of Specific Columns

import org.apache.spark.ml.linalg.Vector 

userProfileDF.select("languages_preferences_w2v", "clean_bio_w2v")
  .head
  .toSeq.foreach((field: Any) => {
    println(field.asInstanceOf[Vector].size)
  })

Convert a DataFrame into a Dataset

import java.sql.Timestamp
import org.apache.spark.ml.feature.SQLTransformer

case class Starring(user: Int, item: Int, star: Int, starred_at: Timestamp)

val starringBuilder = new SQLTransformer()
val starringSQL = """
SELECT from_user_id AS user, repo_id AS item, 1 AS star, starred_at
FROM __THIS__
ORDER BY user, starred_at DESC
"""
starringBuilder.setStatement(starringSQL)
val starringDS = starringBuilder.transform(rawDF).as[Starring]

Convert a Dataset into a PairRDD

val pairRDD = dataset
  .select("user", "item")
  .rdd
  .map(row => (row(0), row(1)))

// or

case class UserItemPair(user: Int, item: Int)

val pairRDD = dataset
  .select("user", "item").as[UserItemPair]
  .rdd
  .map(row => (row.user, row.item))

// or

import org.apache.spark.sql.Row

val pairRDD = dataset
  .select("user", "item")
  .rdd
  .map({
    case Row(user: Int, item: Int) => (user, item)
  })

ref:
https://stackoverflow.com/questions/30655914/map-rdd-to-pairrdd-in-scala

Convert a Dataset into a List or Set

val popularRepos = popularReposDS
  .select("item")
  .rdd
  .map(r => r(0).asInstanceOf[Int])
  .collect()
  .to[List]

// or

val popularRepos = popularReposDS
  .select($"repo_id".as[Int])
  .collect()
  .to[List]

// List(98837133, 97071697, 17439026, ...)

ref:
https://stackoverflow.com/questions/32000646/extract-column-values-of-dataframe-as-list-in-apache-spark

Add a Column to a DataFrame

import org.apache.spark.sql.functions.lit

val newDF = df.withColumn("starring", lit(1))

Repeat withColumn()

import org.apache.spark.sql.DataFrame

val booleanColumnNames = Array("fork", "has_issues", "has_projects", "has_downloads", "has_wiki", "has_pages")
val convertedRepoInfoDF = booleanColumnNames.foldLeft[DataFrame](cleanRepoInfoDF)(
  (accDF, columnName) => accDF.withColumn(s"clean_$columnName", col(columnName).cast("int"))
)

// or

val lowerableColumnNames = Array("description", "language", "topics")
val booleanColumnNames = Array("fork", "has_issues", "has_projects", "has_downloads", "has_wiki", "has_pages")
val convertedRepoInfoDF = (lowerableColumnNames ++ booleanColumnNames).foldLeft[DataFrame](cleanRepoInfoDF)((accDF, columnName) => {
  columnName match {
    case _ if lowerableColumnNames.contains(columnName) => 
      accDF.withColumn(s"clean_$columnName", lower(col(columnName)))
    case _ if booleanColumnNames.contains(columnName) => 
      accDF.withColumn(s"clean_$columnName", col(columnName).cast("int"))
  }
})

ref:
https://stackoverflow.com/questions/41400504/spark-scala-repeated-calls-to-withcolumn-using-the-same-function-on-multiple-c

Specify that a Row has any Nullable Column

val nullableColumnNames = Array("bio", "blog", "company", "email", "location", "name")

val df2 = df1.withColumn("has_null", when(nullableColumnNames.map(df1(_).isNull).reduce(_||_), true).otherwise(false))
// or
val df2 = df1.withColumn("has_null", when($"bio".isNull or
                                          $"blog".isNull or
                                          $"company".isNull or
                                          $"email".isNull or
                                          $"location".isNull or
                                          $"name".isNull, true).otherwise(false))

Combine Two Columns into an Array

import org.apache.spark.sql.functions._

fixRepoInfoDS
  .where("topics != ''")
  .withColumn("tags", struct("language", "topics"))
  .select("tags")
  .show(truncate=false)
// +----------------------------------------------------------------------+
// |tags                                                                  |
// +----------------------------------------------------------------------+
// |[Ruby,aasm,activerecord,mongoid,ruby,state-machine,transition]        |
// |[Ruby,captcha,rails,recaptcha,ruby,sinatra]                           |
// |[Python,commandline,gtd,python,sqlite,todo]                           |
// ...
// +----------------------------------------------------------------------+

Select Nested Column Directly

userRecommendationsDF
+----+-----------------------------------------------------------------+
|user|recommendations                                                  |
+----+-----------------------------------------------------------------+
|1   |[[2,0.9997897], [4,0.9988761], [1,0.9988487], [5,0.0]]           |
|3   |[[5,0.9984464], [1,2.9802322E-8], [2,0.0], [4,0.0]]              |
|2   |[[4,0.9921428], [2,0.10759391], [1,0.10749264], [5,1.4901161E-8]]|
+----+-----------------------------------------------------------------+

userRecommendationsDF.printSchema()
// root
// |-- user: integer (nullable = false)
// |-- recommendations: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- item: integer (nullable = true)
// |    |    |-- rating: float (nullable = true)

val userItemsDF = userRecommendationsDF.select($"user", $"recommendations.item")
// +----+------------+
// |user|        item|
// +----+------------+
// |   1|[2, 4, 1, 5]|
// |   3|[5, 1, 2, 4]|
// |   2|[4, 2, 1, 5]|
// +----+------------+

ref:
https://stackoverflow.com/questions/38413040/spark-sql-udf-with-complex-input-parameter

Flatten an Array Column

import org.apache.spark.sql.Row

val userRecommenderItemDF = alsModel.recommendForAllUsers(15)
  .flatMap((row: Row) => {
    val userID = row.getInt(0)
    val recommendations = row.getSeq[Row](1)
    recommendations.map {
      case Row(repoID: Int, score: Float) => {
        (userID, repoID, score, "als")
      }
    }
  })
  .toDF("user_id", "repo_id", "score", "source")

userRecommenderItemDF.show()
// +-------+-------+-------------+------+
// |user_id|repo_id|score        |source|
// +-------+-------+-------------+------+
// |2142   |48     |0.05021151   |als   |
// |2142   |118    |0.05021151   |als   |
// |2142   |68     |0.7791124    |als   |
// |2142   |98     |0.9939307    |als   |
// |2142   |28     |0.53719014   |als   |
// +-------+-------+-------------+------+

Define Conditional Columns

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

val workingStatusProjection: Column = when($"telfs" >= 1 && $"telfs" < 3, "working").
                                      otherwise("not working").
                                      as("working")

val ageProjection: Column = when($"teage" >= 15 && $"teage" <= 22 , "young").
                            when($"teage" >= 23 && $"teage" <= 55 , "active").
                            otherwise("elder").
                            as("age")

val workProjection: Column = workColumns.reduce(_+_).divide(60).as("work_hours")

val new DF = df.select(workingStatusProjection, ageProjection, workProjection)
// +-----------+------+------------------+
// |    working|   age|        work_hours|
// +-----------+------+------------------+
// |    working| elder|               0.0|
// |    working|active|               0.0|
// |    working|active|               0.0|
// |not working|active|               2.0|
// |    working|active| 8.583333333333334|
// +-----------+------+------------------+

Create a Custom Transformation Function for any DataFrame

def withGreeting(df: DataFrame): DataFrame = {
  df.withColumn("greeting", lit("hello world"))
}

def withCat(name: String)(df: DataFrame): DataFrame = {
  df.withColumn("cat", lit(s"$name meow"))
}

val df = Seq(
  "funny",
  "person"
).toDF("something")

val weirdDf = df
  .transform(withGreeting)
  .transform(withCat("kitten"))
// +---------+-----------+-----------+
// |something|   greeting|        cat|
// +---------+-----------+-----------+
// |    funny|hello world|kitten meow|
// |   person|hello world|kitten meow|
// +---------+-----------+-----------+

ref:
https://medium.com/@mrpowers/chaining-custom-dataframe-transformations-in-spark-a39e315f903c

Combine Two Arrays from Two DataFrames

// or
val bothItemsRDD = userPredictedItemsDF.join(userActualItemsDF, Seq("user_id", "user_id"))
  .select(userPredictedItemsDF.col("items"), userActualItemsDF.col("items"))
  .rdd
  .map({
    case Row(userPredictedItems: Seq[Int], userActualItems: Seq[Int]) => {
      (userPredictedItems.slice(0, $(k)).toArray, userActualItems.slice(0, $(k)).toArray)
    }
  })

ref:
https://stackoverflow.com/questions/28166555/how-to-convert-row-of-a-scala-dataframe-into-case-class-most-efficiently

Handle Missing Values (NaN and Null)

NaN stands for "Not a Number", it's usually the result of a mathematical operation that doesn't make sense, e.g. 5/0.

val df1 = repoInfoDF.na.fill("")
val df2 = repoInfoDF.na.fill("", Seq("description", "homepage"))

ref:
https://stackoverflow.com/questions/33089781/spark-dataframe-handing-empty-string-in-onehotencoder
https://stackoverflow.com/questions/43882699/which-differences-there-are-between-null-and-nan-in-spark-how-to-deal-with

Calculate the Difference between Two Dates

import org.apache.spark.sql.functions._

df.withColumn("updated_at_days_since_today", datediff(current_date(), $"updated_at"))

ref:
https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html

Create an User-Defined Function (UDF) which Accepts One Column

import org.apache.spark.sql.functions.udf

val df = Seq(
  (1, "Company Name Inc."),
  (2, "Company Name Co."), 
  (3, "Company Name ")
).toDF("id", "company")

val removeUninformativeWords = (text: String) => {
  text
    .toLowerCase()
    .replaceAll(", inc.", "")
    .replaceAll(" inc.", "")
    .replaceAll("-inc", "")
    .replaceAll(" co., ltd.", "")
    .replaceAll(" co.", "")
    .replaceAll(" ltd.", "")
    .replaceAll(".com", "")
    .replaceAll("@", "")
    .trim()
}
val removeUninformativeWordsUDF = udf(removeUninformativeWords)

val newDF = df.withColumn("fix_company", removeUninformativeWordsUDF($"company"))
newDF.show()

ref:
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-udfs.html

Create an User-Defined Function (UDF) which Accepts Multiple Columns

When defining your UDFs, you must use Row for mapping StructType instead of any custom case classes.

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row

userRecommendationsDF.printSchema()
// root
 // |-- user: integer (nullable = false)
 // |-- recommendations: array (nullable = true)
 // |    |-- element: struct (containsNull = true)
 // |    |    |-- item: integer (nullable = true)
// |    |    |-- rating: float (nullable = true)

val extractItemsUDF = udf((user: Int, recommendations: Seq[Row]) => {
  val items: Seq[Int] = recommendations.map(_.getInt(0))
  val ratings: Seq[Float] = recommendations.map(_.getFloat(1))
  items
})
userRecommendationsDF.withColumn("items", extractItemsUDF($"user", $"recommendations"))
// +----+-----------------------------------------------------------------+------------+
// |user|recommendations                                                  |items       |
// +----+-----------------------------------------------------------------+------------+
// |1   |[[2,0.9997897], [4,0.9988761], [1,0.9988487], [5,0.0]]           |[2, 4, 1, 5]|
// |3   |[[5,0.9984464], [1,2.9802322E-8], [2,0.0], [4,0.0]]              |[5, 1, 2, 4]|
// |2   |[[4,0.9921428], [2,0.10759391], [1,0.10749264], [5,1.4901161E-8]]|[4, 2, 1, 5]|
// +----+-----------------------------------------------------------------+------------+

ref:
https://stackoverflow.com/questions/32196207/derive-multiple-columns-from-a-single-column-in-a-spark-dataframe

The GenericRowWithSchema cannot be cast to XXX issue:

There is a strict mapping between Spark SQL data types and Scala types, such as IntegerType vs. Int, TimestampType vs. java.sql.Timestamp and StructType vs. org.apache.spark.sql.Row.

ref:
https://stackoverflow.com/questions/38413040/spark-sql-udf-with-complex-input-parameter
https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types

Create an User-Defined Function (UDF) with Extra Parameters

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import scala.util.control.Breaks._

val df = spark.createDataFrame(Seq(
  ("Bob", 35, "Web Backend Developer"),
  ("Cathy", 24, "UX Designer"),
  ("Vic", 26, "PM"),
  ("Tom", 29, "Back end Engineer")
)).toDF("name", "age", "occupation")

// the return type of this UDF is Double
def containsAnyOfUDF(substrings: Array[String]): UserDefinedFunction = udf[Double, String]((text: String) => {
  var result = 0.0
  breakable {
    for (substring <- substrings) {
      if (text.contains(substring)) {
        result = 1.0
        break
      }
    }
  }
  result
})

val backends = Array("backend", "back end")
df.withColumn("knows_backend", containsAnyOfUDF(backends)(lower($"occupation"))))
// +-----+---+---------------------+-------------+
// |name |age|occupation           |knows_backend|
// +-----+---+---------------------+-------------+
// |Bob  |35 |Web Backend Developer|1.0          |
// |Cathy|24 |UX Designer          |0.0          |
// |Vic  |26 |PM                   |0.0          |
// |Tom  |29 |Back end Engineer    |1.0          |
// +-----+---+---------------------+-------------+

ref:
https://stackoverflow.com/questions/35546576/how-can-i-pass-extra-parameters-to-udfs-in-sparksql

Select Rows that Contain Certain String in a DataFrame

userInfoDF
  .where("company LIKE '%Inc%'")
  .orderBy($"company".desc)

Order By One of the Elements in a List Column

import org.apache.spark.ml.linalg.{Vector, Vectors}

val to_array = udf((v: Vector) => v.toDense.values)

resultTestDF
  .where("user_id = 652070")
  .orderBy(to_array($"probability").getItem(1).desc)
  .select("user_id", "repo_id", "starring", "prediction", "probability")
  .show(false)

Show Distinct Values

df.select($"company").distinct().orderBy($"company".desc)

import org.apache.spark.sql.functions.approx_count_distinct

imputedUserInfoDF.select(approx_count_distinct($"company")).show()
// 39581

ref:
https://databricks.com/blog/2016/05/19/approximate-algorithms-in-apache-spark-hyperloglog-and-quantiles.html

Group By then Aggregation

import org.apache.spark.sql.functions.count

val df = userInfoDF
  .groupBy($"company")
  .agg(count("user_id").alias("count"))
  .orderBy($"count".desc)
  .limit(1000)

Group By then Get First/Top n Items

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rank

val topN = 10
val window = Window.partitionBy($"user_id").orderBy($"starred_at".desc)
val userLanguagesDF = repoInfoStarringDF
  .withColumn("rank", rank.over(window))
  .where($"rank" <= topN)
  .groupBy($"user_id")
  .agg(collect_list($"language").alias("languages"))
// +--------+-----------------------------------------------------------------+
// |user_id |languages                                                        |
// +--------+-----------------------------------------------------------------+
// |2142    |[JavaScript, Python, Python, Ruby, Go]                           |
// |59990   |[Python, JavaScript, Go, JavaScript, Go]                         |
// |101094  |[JavaScript, C++, Max, C, JavaScript]                            |
// |109622  |[PHP, PHP, PHP, PHP, Python]                                     |
// |201694  |[TypeScript, Python, Vim script, Vim script, Python]             |
// +--------+-----------------------------------------------------------------+

ref:
https://stackoverflow.com/questions/33655467/get-topn-of-all-groups-after-group-by-using-spark-dataframe
https://stackoverflow.com/questions/35918262/spark-topn-after-groupby

or

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number

val df = spark.createDataFrame(Seq(
  (2, 1, 0, "2017-05-18 22:01:00.0"),
  (1, 1, 1, "2017-05-16 20:01:00.0"),
  (2, 5, 0, "2017-05-21 22:01:00.0"),
  (1, 2, 1, "2017-05-17 21:01:00.0"),
  (1, 4, 1, "2017-05-17 22:01:00.0"),
  (1, 7, 1, "2017-05-12 22:01:00.0"),
  (3, 1, 1, "2017-05-19 22:01:00.0"),
  (3, 2, 1, "2017-05-30 22:01:00.0"),
  (3, 5, 1, "2017-05-01 22:01:00.0"),
  (1, 6, 1, "2017-05-19 22:01:00.0")
)).toDF("user_id", "repo_id", "starring", "starred_at")

val windowSpec = Window.partitionBy($"user_id").orderBy($"starred_at".desc)
val userActualItemsDF = df
  .withColumn("row_number", row_number().over(windowSpec))
  .where($"row_number" <= 3)
// +-------+-------+--------+---------------------+----------+
// |user_id|repo_id|starring|starred_at           |row_number|
// +-------+-------+--------+---------------------+----------+
// |1      |6      |1       |2017-05-19 22:01:00.0|1         |
// |1      |4      |1       |2017-05-17 22:01:00.0|2         |
// |1      |2      |1       |2017-05-17 21:01:00.0|3         |
// |3      |2      |1       |2017-05-30 22:01:00.0|1         |
// |3      |1      |1       |2017-05-19 22:01:00.0|2         |
// |3      |5      |1       |2017-05-01 22:01:00.0|3         |
// |2      |5      |0       |2017-05-21 22:01:00.0|1         |
// |2      |1      |0       |2017-05-18 22:01:00.0|2         |
// +-------+-------+--------+---------------------+----------+

ref:
https://stackoverflow.com/questions/33878370/how-to-select-the-first-row-of-each-group
http://xinhstechblog.blogspot.tw/2016/04/spark-window-functions-for-dataframes.html

Group By then Concatenate Strings

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val userTopicsDF = repoInfoStarringDF
  .where($"topics" =!= "")
  .withColumn("rank", rank.over(Window.partitionBy($"user_id").orderBy($"starred_at".desc)))
  .where($"rank" <= 50)
  .groupBy($"user_id")
  .agg(concat_ws(",", collect_list($"topics")).alias("topics_preferences"))
// +--------+-------------------------------------------------------------------+
// | user_id|                                                 topics_preferences|
// +--------+-------------------------------------------------------------------+
// |    2142|go,golang,grpc,microservice,protobuf,consul,go,golang,load-balancer|
// |   59990|api,api-gateway,aws,aws-infrastructure,aws-lambda,deploy-tool      |
// |  101094|ableton,javascript,maxmsp,c,c89,gui,imgui,nuklear                  |
// |  109622|stripe,algolia,magento,magento-algoliasearch,php,search,aws        |
// +--------+-------------------------------------------------------------------+

ref:
https://community.hortonworks.com/questions/44886/dataframe-groupby-and-concat-non-empty-strings.html

Group By after Order By Maintains Order

val list = Seq(
  (2, 1, 0, "2017-05-18 22:01:00.0"),
  (1, 1, 1, "2017-05-16 20:01:00.0"),
  (2, 5, 0, "2017-05-21 22:01:00.0"),
  (1, 2, 1, "2017-05-17 21:01:00.0"),
  (1, 4, 1, "2017-05-17 22:01:00.0"),
  (3, 1, 1, "2017-05-19 22:01:00.0"),
  (3, 2, 1, "2017-05-30 22:01:00.0"),
  (3, 5, 1, "2017-05-01 22:01:00.0"),
  (1, 6, 1, "2017-05-19 22:01:00.0")
)
val df = list.toDF("user_id", "repo_id", "starring", "starred_at")

val userActualItemsDF = df
  .orderBy($"starred_at".desc)
  .groupBy($"user_id")
  .agg(collect_list($"repo_id").alias("items"))
// +-------+------------+
// |user_id|       items|
// +-------+------------+
// |      1|[6, 4, 2, 1]|
// |      3|   [2, 1, 5]|
// |      2|      [5, 1]|
// +-------+------------+

ref:
https://stackoverflow.com/questions/39505599/spark-dataframe-does-groupby-after-orderby-maintain-that-order

collect_list Multiple Columns

import org.apache.spark.sql.functions.{collect_list, struct}

predictionDF
// +----+----+------+----------+
// |user|item|rating|prediction|
// +----+----+------+----------+
// |   1|   1|    12| 0.9988487|
// |   3|   5|     8| 0.9984464|
// |   1|   4|     4|0.99887615|
// |   2|   4|     1| 0.9921428|
// |   1|   2|    90| 0.9997897|
// +----+----+------+----------+

val userRecommendationsDF = predictionDF
  .groupBy($"user")
  .agg(collect_list(struct($"item", $"prediction")).alias("recommendations"))
// +----+----------------------------------------------+
// |user|recommendations                               |
// +----+----------------------------------------------+
// |1   |[[1,0.9988487], [4,0.99887615], [2,0.9997897]]|
// |3   |[[5,0.9984464]]                               |
// |2   |[[4,0.9921428]]                               |
// +----+----------------------------------------------+

groupByKey() on Dataset

val keyValues = List(
  (3, "Me"), (1, "Thi"), (2, "Se"), (3, "ssa"), (1, "sIsA"), (3, "ge:"), (3, "-"), (2, "cre"), (2, "t")
)
val keyValuesDS = keyValues.toDS
val ds = keyValuesDS
  .groupByKey(P => P._1)
  .mapValues(p => p._2)
  .reduceGroups((acc, str) => acc + str)

ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/yrfPh/datasets

case class TimeUsageRow(
  working: String,
  sex: String,
  age: String,
  primaryNeeds: Double,
  work: Double,
  other: Double
)

val summaryDs = doShit()
// +-----------+------+------+------------------+------------------+------------------+
// |    working|   sex|   age|      primaryNeeds|              work|             other|
// +-----------+------+------+------------------+------------------+------------------+
// |    working|  male| elder|             15.25|               0.0|              8.75|
// |    working|female|active|13.833333333333334|               0.0|10.166666666666666|
// |    working|female|active|11.916666666666666|               0.0|12.083333333333334|
// |not working|female|active|13.083333333333334|               2.0| 8.916666666666666|
// |    working|  male|active|11.783333333333333| 8.583333333333334|3.6333333333333333|
// +-----------+------+------+------------------+------------------+------------------+

val finalDs = summaryDs
  .groupByKey((row: TimeUsageRow) => {
    (row.working, row.sex, row.age)
  })
  .agg(
    round(typed.avg[TimeUsageRow](_.primaryNeeds), 1).as(Encoders.DOUBLE),
    round(typed.avg[TimeUsageRow](_.work), 1).as(Encoders.DOUBLE),
    round(typed.avg[TimeUsageRow](_.other), 1).as(Encoders.DOUBLE)
  )
  .map({
    case ((working, sex, age), primaryNeeds, work, other) => {
      TimeUsageRow(working, sex, age, primaryNeeds, work, other)
    }
  })
  .orderBy("working", "sex", "age")
  .as[TimeUsageRow]
// +-----------+------+------+------------+----+-----+
// |    working|   sex|   age|primaryNeeds|work|other|
// +-----------+------+------+------------+----+-----+
// |not working|female|active|        12.4| 0.5| 10.8|
// |not working|female| elder|        10.9| 0.4| 12.4|
// |not working|female| young|        12.5| 0.2| 11.1|
// |not working|  male|active|        11.4| 0.9| 11.4|
// |not working|  male| elder|        10.7| 0.7| 12.3|
// +-----------+------+------+------------+----+-----+

ref:
https://stackoverflow.com/questions/39946210/spark-2-0-datasets-groupbykey-and-divide-operation-and-type-safety

Aggregator

val myAgg = new Aggregator[IN, BUF, OUT] {
  def zero: BUF = ??? // the initial value
  def reduce(b: BUF, a: IN): BUF = ??? // add an element to the running total
  def merge(b1: BUF, b2: BUF): BUF = ??? // merge itermediate values
  def finish(b: BUF): OUT = ??? // return the final result
  override def bufferEncoder: Encoder[BUF] = ???
  override def outputEncoder: Encoder[OUT] = ???
}.toColumn

ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/yrfPh/datasets

Join Two DataFrames

val starringDF = spark.createDataFrame(Seq(
    (652070, 21289110, 1),
    (652070, 4164482, 1),
    (652070, 44838949, 0),
    (1912583, 4164482, 0)
)).toDF("user_id", "repo_id", "starring")

val repoDF = spark.createDataFrame(Seq(
    (21289110, "awesome-python", "Python", 37336),
    (4164482, "django", "Python", 27451),
    (44838949, "swift", "C++", 39840)
)).toDF("id", "repo_name", "repo_language", "stargazers_count")

val fullDF = starringDF.join(repoDF, starringDF.col("repo_id") === repoDF.col("id"))
// +-------+--------+--------+--------+--------------+-------------+----------------+
// |user_id| repo_id|starring|      id|     repo_name|repo_language|stargazers_count|
// +-------+--------+--------+--------+--------------+-------------+----------------+
// | 652070|21289110|       1|21289110|awesome-python|       Python|           37336|
// | 652070| 4164482|       1| 4164482|        django|       Python|           27451|
// | 652070|44838949|       0|44838949|         swift|          C++|           39840|
// |1912583| 4164482|       0| 4164482|        django|       Python|           27451|
// +-------+--------+--------+--------+--------------+-------------+----------------+

ref:
https://docs.databricks.com/spark/latest/faq/join-two-dataframes-duplicated-column.html

Broadcast Join

ref:
https://docs.cloud.databricks.com/docs/latest/databricks_guide/04%20SQL%2C%20DataFrames%20%26%20Datasets/05%20BroadcastHashJoin%20-%20scala.html