Build a recommender system with Spark: Logistic Regression

Build a recommender system with Spark: Logistic Regression

在這個系列的文章裡,我們將使用 Apache Spark、XGBoost、Elasticsearch 和 MySQL 等工具來搭建一個推薦系統的 Machine Learning Pipeline。推薦系統的組成可以粗略地分成 Candidate Generation 和 Ranking 兩個部分,前者是針對用戶產生候選物品集,常用的方法有 Collaborative Filtering、Content-based、標籤配對、熱門排行或人工精選等;後者則是對這些候選物品排序,以 Top N 的方式呈現最終的推薦結果,常用的方法有 Logistic Regression。

在本篇文章中,我們將以 Ranking 階段常用的方法之一:Logistic Regression 邏輯迴歸為例,利用 Apache Spark 的 Logistic Regression 模型建立一個 GitHub repositories 的推薦系統,以用戶對 repo 的打星紀錄和用戶與 repo 的各項屬性做為特徵,預測出用戶會不會打星某個 repo(分類問題)。最後訓練出來的模型就可以做為我們的推薦系統的 Ranking 模組。不過因為 LR 是線性模型,所以通常需要大量的 Feature Engineering 來習得非線性關係。所以這篇文章的重點是 Spark ML 的 Pipeline 機制和特徵工程,不會在演算法的部分著墨太多。

完整的程式碼可以在 https://github.com/vinta/albedo 找到。

系列文章:

Submit the Application

$ spark-submit \
--master spark://192.168.10.100:7077 \
--packages "com.github.fommil.netlib:all:1.1.2,com.hankcs:hanlp:portable-1.3.4,mysql:mysql-connector-java:5.1.41" \
--class ws.vinta.albedo.UserProfileBuilder \
target/albedo-1.0.0-SNAPSHOT.jar

$ spark-submit \
--master spark://192.168.10.100:7077 \
--packages "com.github.fommil.netlib:all:1.1.2,com.hankcs:hanlp:portable-1.3.4,mysql:mysql-connector-java:5.1.41" \
--class ws.vinta.albedo.RepoProfileBuilder \
target/albedo-1.0.0-SNAPSHOT.jar

$ spark-submit \
--master spark://192.168.10.100:7077 \
--packages "com.github.fommil.netlib:all:1.1.2,com.hankcs:hanlp:portable-1.3.4,mysql:mysql-connector-java:5.1.41" \
--class ws.vinta.albedo.LogisticRegressionRanker \
target/albedo-1.0.0-SNAPSHOT.jar

ref:
https://vinta.ws/code/setup-spark-scala-and-maven-with-intellij-idea.html
https://spark.apache.org/docs/latest/submitting-applications.html
https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application

Load Data

我們之前已經利用 GitHub API 和 BigQuery 上的 GitHub Archive 收集了 150 萬筆的打星紀錄和所屬的用戶、repo 數據。目前有以下幾個數據集,大致上是照著 GitHub API 建立的,欄位分別如下:

rawUserInfoDS.printSchema()
// root
 // |-- user_id: integer (nullable = true)
 // |-- user_login: string (nullable = true)
 // |-- user_account_type: string (nullable = true)
 // |-- user_name: string (nullable = true)
 // |-- user_company: string (nullable = true)
 // |-- user_blog: string (nullable = true)
 // |-- user_location: string (nullable = true)
 // |-- user_email: string (nullable = true)
 // |-- user_bio: string (nullable = true)
 // |-- user_public_repos_count: integer (nullable = true)
 // |-- user_public_gists_count: integer (nullable = true)
 // |-- user_followers_count: integer (nullable = true)
 // |-- user_following_count: integer (nullable = true)
 // |-- user_created_at: timestamp (nullable = true)
 // |-- user_updated_at: timestamp (nullable = true)

rawRepoInfoDS.printSchema()
// |-- repo_id: integer (nullable = true)
 // |-- repo_owner_id: integer (nullable = true)
 // |-- repo_owner_username: string (nullable = true)
 // |-- repo_owner_type: string (nullable = true)
 // |-- repo_name: string (nullable = true)
 // |-- repo_full_name: string (nullable = true)
 // |-- repo_description: string (nullable = true)
 // |-- repo_language: string (nullable = true)
 // |-- repo_created_at: timestamp (nullable = true)
 // |-- repo_updated_at: timestamp (nullable = true)
 // |-- repo_pushed_at: timestamp (nullable = true)
 // |-- repo_homepage: string (nullable = true)
 // |-- repo_size: integer (nullable = true)
 // |-- repo_stargazers_count: integer (nullable = true)
 // |-- repo_forks_count: integer (nullable = true)
 // |-- repo_subscribers_count: integer (nullable = true)
 // |-- repo_is_fork: boolean (nullable = true)
 // |-- repo_has_issues: boolean (nullable = true)
 // |-- repo_has_projects: boolean (nullable = true)
 // |-- repo_has_downloads: boolean (nullable = true)
 // |-- repo_has_wiki: boolean (nullable = true)
 // |-- repo_has_pages: boolean (nullable = true)
 // |-- repo_open_issues_count: integer (nullable = true)
 // |-- repo_topics: string (nullable = true)

rawStarringDS.printSchema()
// root
 // |-- user_id: integer (nullable = true)
 // |-- repo_id: integer (nullable = true)
 // |-- starred_at: timestamp (nullable = true)
 // |-- starring: double (nullable = true)

ref:
https://www.githubarchive.org/
http://ghtorrent.org/

載入資料之後,要做的第一件事應該就是 Exploratory Data Analysis (EDA) 了,把玩一下手上的數據。建議大家可以試試 Apache Zeppelin 或是 Databricks 的 Notebook,除了內建 Spark 支援的所有語言之外,也整合了 NoSQL 和 JDBC 支援的資料庫,要畫圖表也很方便,簡直比 Jupyter Notebook 還好用了。

ref:
https://zeppelin.apache.org/
https://databricks.com/

Build User Profile / Item Profile

在這個專案中,最主要的數據主體就是 user 和 repo,所以我們針對兩者各自建立 User Profile 和 Item Profile,作為之後在模型訓練階段會使用的特徵。我們把這個步驟跟模型訓練的流程分開,這樣對整個架構的搭建會更有彈性。實務上,我們可以用 user id 或 item id 當 key,直接把製作好的特徵存進 Redis 或其他 schemaless 的 NoSQL 資料庫,方便之後給多個模型取用;在做 real-time 推薦時,也可以很快地拿到特徵,只需要重新計算部份欄位即可。

不過因為這裡主要用的是來自 GitHub API 的資料,某種程度上人家已經幫我們做了很多資料清理和正規化的動作了,但是在現實中,你的系統要處理的數據通常不會這麼乾淨,可能來自各種 data source、有著各種格式,還會隨著時間而改變,通常得花上不少力氣做 Extract, Transform, Load (ETL),所以最好在寫 log(埋點)的時候就溝通好。而且在 production 環境中,數據是會一直變動的,要確保數據的時效性和容錯性,很重要的一個部分就是 monitoring。

ref:
http://www.algorithmdog.com/ad-rec-deploy
https://tech.meituan.com/online-feature-system02.html

礙於篇幅有限,以下的文章中我們只會挑幾個重要的部分說明。簡單說,在這個步驟的最後,我們會得到 userProfileDFrepoProfileDF 這兩個 DataFrame,分別存放製作好的特徵。詳細的程式碼如下:

ref:
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/UserProfileBuilder.scala
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/RepoProfileBuilder.scala

Feature Engineering

以推薦系統為例,特徵可以分成以下四種:

  • 用戶特徵:用戶本身的各種屬性,例如 user id、性別、職業或所在的城市等
  • 物品特徵:物品本身的各種屬性,例如 item id、作者、標題、分類、評分或所屬的標籤等
  • 交互特徵:用戶對物品做出的某項行為,該行為的 aggregation 或交叉特徵,例如是否看過同類型的電影、最近聽的歌曲的曲風分佈或上週買了多少高單價的商品
  • 上下文特徵:用戶對物品做出的某項行為,該行為的 metadata,例如發生的時間、使用的裝置或當前的 GPS 位置等

有些特徵是在資料採集階段就能拿到,有些特徵則會需要額外的步驟(例如透過外部的 API 或是其他模型)才能取得,也有些特徵必須即時更新。順道一提,因為我們要預測的是「某個用戶會不會打星某個 repo」,所以下述特徵裡的 user 可以是 repo stargazer 也可以是 repo owner。

原始特徵:

  • 用戶特徵
    • user_id
    • user_login
    • user_name
    • user_email
    • user_blog
    • user_bio
    • user_company
    • user_location
    • user_followers_coung
    • user_following_count
    • user_public_repos_count
    • user_public_gists_count
    • user_created_at
    • user_updated_at
  • 物品特徵
    • repo_id
    • repo_name
    • repo_owner
    • repo_owner_type
    • repo_language
    • repo_description
    • repo_homepage
    • repo_subscribers_count
    • repo_stargazers_count
    • repo_forks_count
    • repo_size
    • repo_created_at
    • repo_updated_at
    • repo_pushed_at
    • repo_has_issues
    • repo_has_projects
    • repo_has_downloads
    • repo_has_wiki
    • repo_has_pages
    • repo_open_issues_count
    • repo_topics
  • 交互特徵
    • user_stars_repo
    • user_follows_user
  • 上下文特徵
    • user_repo_starred_at

發想特徵:

  • 用戶特徵
    • user_days_between_created_at_today: 該用戶的註冊日期距離今天過了幾年
    • user_days_between_updated_at_today: 該用戶的更新日期距離今天過了幾天
    • user_repos_avg_stargazers_count: 該用戶名下的所有 repo(不含 fork 的)的平均星星數
    • user_organizations: 該用戶屬於哪些組織
    • user_has_null: 該用戶至少有一個欄位是 null
    • user_has_blog: 該用戶有沒有網站
    • user_is_freelancer: 該用戶的 bio 中是否包含 Freelancer 等字眼
    • user_is_junior: 該用戶的 bio 中是否包含 Beginner 或 Junior 等字眼
    • user_is_lead: 該用戶的 bio 中是否包含 Team Lead、Architect、Creator、CTO 或 VP of Engineering 等字眼
    • user_is_scholar: 該用戶的 bio 中是否包含 Researcher、Scientist、PhD 或 Professor 等字眼
    • user_is_pm: 該用戶的 bio 中是否包含 Product Manager 等字眼
    • user_knows_backend: 該用戶的 bio 中是否包含 Backend 或 Back end 等字眼
    • user_knows_data: 該用戶的 bio 中是否包含 Machine Learning、Deep Learning 或 Data Science 等字眼
    • user_knows_devops: 該用戶的 bio 中是否包含 DevOps、SRE、SysAdmin 或 Infrastructure 等字眼
    • user_knows_frontend: 該用戶的 bio 中是否包含 Frontend 或 Front end 等字眼
    • user_knows_mobile: 該用戶的 bio 中是否包含 Mobile、iOS 或 Android 等字眼
    • user_knows_recsys: 該用戶的 bio 中是否包含 Recommender System、Data Mining 或 Information Retrieval 等字眼
    • user_knows_web: 該用戶的 bio 中是否包含 Web Development 或 Fullstack 等字眼
  • 物品特徵
    • repo_created_at_days_since_today: 該 repo 的建立日期距離今天過了幾天
    • repo_updated_at_days_since_today: 該 repo 的更新日期距離今天過了幾天
    • repo_pushed_at_days_since_today: 該 repo 的提交日期距離今天過了幾天
    • repo_stargazers_count_in_30days: 該 repo 在 30 天內收到的星星數
    • repo_subscribers_stargazers_ratio: 該 repo 的 watch 數和 star 數的比例
    • repo_forks_stargazers_ratio: 該 repo 的 fork 數和 star 數的比例
    • repo_open_issues_stargazers_ratio: 該 repo 的 數和 star 數的比例
    • repo_releases_count: 該 repo 的 release 或 tag 數
    • repo_lisence: 該 repo 的授權條款
    • repo_readme: 該 repo 的 README 內容
    • repo_has_null: 該 repo 有至少一個欄位是 null
    • repo_has_readme: 該 repo 是否有 README 檔案
    • repo_has_changelog: 該 repo 是否有 CHANGELOG 檔案
    • repo_has_contributing: 該 repo 是否有 CONTRIBUTING 檔案
    • repo_has_tests: 該 repo 是否有測試
    • repo_has_ci: 該 repo 是否有 CI
    • repo_has_dockerfile: 該 repo 是否有 Dockerfile
    • repo_is_unmaintained: 該 repo 是否不再維護了
    • repo_is_awesome: 該 repo 是否被收錄進任何的 awesome-xxx 列表裡
    • repo_is_vinta_starred: 該 repo 是否被 @vinta aka 本文的作者打星了
  • 交互特徵
    • user_starred_repos_count: 該用戶總共打星了多少 repo
    • user_avg_daily_starred_repos_count: 該用戶平均每天打星多少 repo
    • user_forked_repos_count: 該用戶總共 fork 了多少 repo
    • user_follower_following_count_ratio: 該用戶的 follower 數和 following 數的比例
    • user_recent_searched_keywords: 該用戶最近搜尋的 50 個關鍵字
    • user_recent_commented_repos: 該用戶最近留言的 50 個 repo
    • user_recent_watched_repos: 該用戶最近訂閱的 50 個 repo
    • user_recent_starred_repos_descriptions: 該用戶最近打星的 50 個 repo 的描述
    • user_recent_starred_repos_languages: 該用戶最近打星的 50 個 repo 的語言
    • user_recent_starred_repos_topics: 該用戶最近打星的 50 個 repo 的標籤
    • user_follows_repo_owner: 該用戶是否追蹤該 repo 的作者
    • repo_language_index_in_user_recent_repo_languages: 該 repo 的語言出現在該用戶最近打星的語言列表的順序
    • repo_language_count_in_user_recent_repo_languages: 該 repo 的語言出現在該用戶最近打星的語言列表的次數
    • repo_topics_user_recent_topics_similarity: 該 repo 的標籤與該用戶最近打星的標籤列表的相似度
  • 上下文特徵
    • als_model_prediction: 來自 ALS 模型的預測值,該用戶對該 repo 的偏好程度
    • gbdt_model_index: 來自 GBDT 模型的 tree index,該 observation 的自動特徵

Feature Engineering 特徵工程中常見的方法
https://vinta.ws/code/feature-engineering.html

Detect Outliers

除了缺失值之外,離群值(異常值)也是需要注意的地方。如果是 continuous 特徵,用 box plot 可以很快地發現離群值;如果是 categorical 特徵,可以 SELECT COUNT(*) ... GROUP BY 一下,然後用 bar chart 查看每個 category 的數量。取決於你所要解決的問題,異常值可能可以直接忽略,也可能需要特別對待,例如搞清楚異常值出現的原因,是資料採集時的差錯或是某種隱含的深層的因素之類的。

ref:
https://www.analyticsvidhya.com/blog/2016/01/guide-data-exploration/
https://www.slideshare.net/tw_dsconf/123-70852901

Impute Missing Values

可以利用 df.describe().show() 查看各個欄位的統計數據:countmeanstddevminmax。除了使用 df.where("some_column IS NULL") 之外,比較不同欄位的 count 差異也可以很快地發現哪些欄位有缺失值。順便觀察一下有缺失值的欄位和 target variable 有沒有什麼關聯。

這裡直接對 nullNaN 數據填充缺失值,因為以下幾個欄位都是字串類型,所以直接改成空字串,方便後續的處理。然後順便做一個 has_null 的特徵。

針對 user:

import org.apache.spark.sql.functions._

val nullableColumnNames = Array("user_name", "user_company", "user_blog", "user_location", "user_bio")

val imputedUserInfoDF = rawUserInfoDS
  .withColumn("user_has_null", when(nullableColumnNames.map(rawUserInfoDS(_).isNull).reduce(_ || _), true).otherwise(false))
  .na.fill("", nullableColumnNames)

針對 repo:

import org.apache.spark.sql.functions._

val nullableColumnNames = Array("repo_description", "repo_homepage")

val imputedRepoInfoDF = rawRepoInfoDS
  .withColumn("repo_has_null", when(nullableColumnNames.map(rawRepoInfoDS(_).isNull).reduce(_ || _), true).otherwise(false))
  .na.fill("", nullableColumnNames)

ref:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameNaFunctions

如果是數值類型的欄位可以考慮使用 Imputer

ref:
https://spark.apache.org/docs/latest/ml-features.html#imputer

Clean Data

針對 user,用 User-defined Function 對幾個文字欄位做一些正規化的處理:

import org.apache.spark.sql.functions._

val cleanUserInfoDF = imputedUserInfoDF
  .withColumn("user_clean_company", cleanCompanyUDF($"user_company"))
  .withColumn("user_clean_location", cleanLocationUDF($"user_location"))
  .withColumn("user_clean_bio", lower($"user_bio"))

針對 repo,過濾掉一些 repo_stargazers_count 太多和太少、description 欄位含有 "unmaintained" 或 "assignment" 等字眼的項目:

val reducedRepoInfo = imputedRepoInfoDF
  .where($"repo_is_fork" === false)
  .where($"repo_forks_count" <= 90000)
  .where($"repo_stargazers_count".between(30, 100000))

val unmaintainedWords = Array("%unmaintained%", "%no longer maintained%", "%deprecated%", "%moved to%")
val assignmentWords = Array("%assignment%", "%作業%", "%作业%")
val demoWords = Array("test", "%demo project%")
val blogWords = Array("my blog")

val cleanRepoInfoDF = reducedRepoInfo
  .withColumn("repo_clean_description", lower($"repo_description"))
  .withColumn("repo_is_unmaintained", when(unmaintainedWords.map($"repo_clean_description".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("repo_is_assignment", when(assignmentWords.map($"repo_clean_description".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("repo_is_demo", when(demoWords.map($"repo_clean_description".like(_)).reduce(_ or _) and $"repo_stargazers_count" <= 40, true).otherwise(false))
  .withColumn("repo_is_blog", when(blogWords.map($"repo_clean_description".like(_)).reduce(_ or _) and $"repo_stargazers_count" <= 40, true).otherwise(false))
  .where($"repo_is_unmaintained" === false)
  .where($"repo_is_assignment" === false)
  .where($"repo_is_demo" === false)
  .where($"repo_is_blog" === false)
  .withColumn("repo_clean_language", lower($"repo_language"))
  .withColumn("repo_clean_topics", lower($"repo_topics"))

Construct Features

針對 user,根據上述的「發想特徵」,製作出新的特徵:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val webThings = Array("web", "fullstack", "full stack")
val backendThings = Array("backend", "back end", "back-end")
val frontendThings = Array("frontend", "front end", "front-end")
val mobileThings = Array("mobile", "ios", "android")
val devopsThings = Array("devops", "sre", "admin", "infrastructure")
val dataThings = Array("machine learning", "deep learning", "data scien", "data analy")
val recsysThings = Array("data mining", "recommend", "information retrieval")

val leadTitles = Array("team lead", "architect", "creator", "director", "cto", "vp of engineering")
val scholarTitles = Array("researcher", "scientist", "phd", "professor")
val freelancerTitles = Array("freelance")
val juniorTitles = Array("junior", "beginner", "newbie")
val pmTitles = Array("product manager")

val userStarredReposCountDF = rawStarringDS
  .groupBy($"user_id")
  .agg(count("*").alias("user_starred_repos_count"))

val starringRepoInfoDF = rawStarringDS
  .select($"user_id", $"repo_id", $"starred_at")
  .join(rawRepoInfoDS, Seq("repo_id"))

val userTopLanguagesDF = starringRepoInfoDF
  .withColumn("rank", rank.over(Window.partitionBy($"user_id").orderBy($"starred_at".desc)))
  .where($"rank" <= 50)
  .groupBy($"user_id")
  .agg(collect_list(lower($"repo_language")).alias("user_recent_repo_languages"))
  .select($"user_id", $"user_recent_repo_languages")

val userTopTopicsDF = starringRepoInfoDF
  .where($"repo_topics" =!= "")
  .withColumn("rank", rank.over(Window.partitionBy($"user_id").orderBy($"starred_at".desc)))
  .where($"rank" <= 50)
  .groupBy($"user_id")
  .agg(concat_ws(",", collect_list(lower($"repo_topics"))).alias("temp_user_recent_repo_topics"))
  .select($"user_id", split($"temp_user_recent_repo_topics", ",").alias("user_recent_repo_topics"))

val userTopDescriptionDF = starringRepoInfoDF
  .where($"repo_description" =!= "")
  .withColumn("rank", rank.over(Window.partitionBy($"user_id").orderBy($"starred_at".desc)))
  .where($"rank" <= 50)
  .groupBy($"user_id")
  .agg(concat_ws(" ", collect_list(lower($"repo_description"))).alias("user_recent_repo_descriptions"))
  .select($"user_id", $"user_recent_repo_descriptions")

val constructedUserInfoDF = cleanUserInfoDF
  .withColumn("user_knows_web", when(webThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_knows_backend", when(backendThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_knows_frontend", when(frontendThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_knows_mobile", when(mobileThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_knows_devops", when(devopsThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_knows_data", when(dataThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_knows_recsys", when(recsysThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_is_lead", when(leadTitles.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_is_scholar", when(scholarTitles.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_is_freelancer", when(freelancerTitles.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_is_junior", when(juniorTitles.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_is_pm", when(pmTitles.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
  .withColumn("user_followers_following_ratio", round($"user_followers_count" / ($"user_following_count" + lit(1.0)), 3))
  .withColumn("user_days_between_created_at_today", datediff(current_date(), $"user_created_at"))
  .withColumn("user_days_between_updated_at_today", datediff(current_date(), $"user_updated_at"))
  .join(userStarredReposCountDF, Seq("user_id"))
  .withColumn("user_avg_daily_starred_repos_count", round($"user_starred_repos_count" / ($"user_days_between_created_at_today" + lit(1.0)), 3))
  .join(userTopDescriptionDF, Seq("user_id"))
  .join(userTopTopicsDF, Seq("user_id"))
  .join(userTopLanguagesDF, Seq("user_id"))

針對 repo,根據上述的「發想特徵」,製作出新的特徵,意思到了就好:

import org.apache.spark.sql.functions._

val vintaStarredRepos = rawStarringDS
  .where($"user_id" === 652070)
  .select($"repo_id".as[Int])
  .collect()
  .to[List]

val constructedRepoInfoDF = cleanRepoInfoDF
  .withColumn("repo_has_activities_in_60days", datediff(current_date(), $"repo_pushed_at") <= 60)
  .withColumn("repo_has_homepage", when($"repo_homepage" === "", false).otherwise(true))
  .withColumn("repo_is_vinta_starred", when($"repo_id".isin(vintaStarredRepos: _*), true).otherwise(false))
  .withColumn("repo_days_between_created_at_today", datediff(current_date(), $"repo_created_at"))
  .withColumn("repo_days_between_updated_at_today", datediff(current_date(), $"repo_updated_at"))
  .withColumn("repo_days_between_pushed_at_today", datediff(current_date(), $"repo_pushed_at"))
  .withColumn("repo_subscribers_stargazers_ratio", round($"repo_subscribers_count" / ($"repo_stargazers_count" + lit(1.0)), 3))
  .withColumn("repo_forks_stargazers_ratio", round($"repo_forks_count" / ($"repo_stargazers_count" + lit(1.0)), 3))
  .withColumn("repo_open_issues_stargazers_ratio", round($"repo_open_issues_count" / ($"repo_stargazers_count" + lit(1.0)), 3))
  .withColumn("repo_text", lower(concat_ws(" ", $"repo_owner_username", $"repo_name", $"repo_language", $"repo_description")))

ref:
https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html

Convert Features

針對 user,這裡主要是對一些 categorical 特徵作 binning:

import org.apache.spark.sql.functions._

val companyCountDF = cleanUserInfoDF
  .groupBy($"user_clean_company")
  .agg(count("*").alias("count_per_user_company"))

val locationCountDF = cleanUserInfoDF
  .groupBy($"user_clean_location")
  .agg(count("*").alias("count_per_user_location"))

val transformedUserInfoDF = constructedUserInfoDF
  .join(companyCountDF, Seq("user_clean_company"))
  .join(locationCountDF, Seq("user_clean_location"))
  .withColumn("user_has_blog", when($"user_blog" === "", 0.0).otherwise(1.0))
  .withColumn("user_binned_company", when($"count_per_user_company" <= 5, "__other").otherwise($"user_clean_company"))
  .withColumn("user_binned_location", when($"count_per_user_location" <= 50, "__other").otherwise($"user_clean_location"))

針對 repo:

import org.apache.spark.sql.functions._

val languagesDF = cleanRepoInfoDF
  .groupBy($"repo_clean_language")
  .agg(count("*").alias("count_per_repo_language"))
  .select($"repo_clean_language", $"count_per_repo_language")
  .cache()

val transformedRepoInfoDF = constructedRepoInfoDF
  .join(languagesDF, Seq("repo_clean_language"))
  .withColumn("repo_binned_language", when($"count_per_repo_language" <= 30, "__other").otherwise($"repo_clean_language"))
  .withColumn("repo_clean_topics", split($"repo_topics", ","))

ref:
https://docs.databricks.com/spark/latest/mllib/binary-classification-mllib-pipelines.html

Prepare the Feature Pipeline

我們過濾掉那些打星了超多 repo 的用戶。從收集到的數據發現,有些用戶甚至打星了一兩萬個 repo,這些用戶可能是個爬蟲專用帳號或是他看到什麼就打星什麼,推薦系統對這樣的用戶來說可能沒什麼意義,還不如從數據集中拿掉。

import org.apache.spark.sql.functions._

val maxStarredReposCount = 2000

val userStarredReposCountDF = rawStarringDS
  .groupBy($"user_id")
  .agg(count("*").alias("user_starred_repos_count"))

val reducedStarringDF = rawStarringDS
  .join(userStarredReposCountDF, Seq("user_id"))
  .where($"user_starred_repos_count" <= maxStarredReposCount)
  .select($"user_id", $"repo_id", $"starred_at", $"starring")

val profileStarringDF = reducedStarringDF
  .join(userProfileDF, Seq("user_id"))
  .join(repoProfileDF, Seq("repo_id"))

Build the Feature Pipeline

把處理特徵的一連串流程寫成 Spark ML Pipeline,方便抽換或是加入新的 Transformer,例如 Standardization、One-hot Encoding 和 Word2Vec,也把 ALS 模型的預測值做為其中一項特徵。

import org.apache.spark.ml.feature._
import org.apache.spark.ml.recommendation.ALSModel
import ws.vinta.albedo.transformers.UserRepoTransformer

val profileStarringDF = reducedStarringDF
  .join(userProfileDF, Seq("user_id"))
  .join(repoProfileDF, Seq("repo_id"))
  .cache()

categoricalColumnNames += "user_id"
categoricalColumnNames += "repo_id"

val userRepoTransformer = new UserRepoTransformer()
  .setInputCols(Array("repo_language", "user_recent_repo_languages"))

continuousColumnNames += "repo_language_index_in_user_recent_repo_languages"
continuousColumnNames += "repo_language_count_in_user_recent_repo_languages"

val alsModelPath = s"${settings.dataDir}/${settings.today}/alsModel.parquet"
val alsModel = ALSModel.load(alsModelPath)
  .setUserCol("user_id")
  .setItemCol("repo_id")
  .setPredictionCol("als_score")
  .setColdStartStrategy("drop")

continuousColumnNames += "als_score"

val categoricalTransformers = categoricalColumnNames.flatMap((columnName: String) => {
  val stringIndexer = new StringIndexer()
    .setInputCol(columnName)
    .setOutputCol(s"${columnName}__idx")
    .setHandleInvalid("keep")

  val oneHotEncoder = new OneHotEncoder()
    .setInputCol(s"${columnName}__idx")
    .setOutputCol(s"${columnName}__ohe")
    .setDropLast(false)

  Array(stringIndexer, oneHotEncoder)
})

val listTransformers = listColumnNames.flatMap((columnName: String) => {
  val countVectorizerModel = new CountVectorizer()
    .setInputCol(columnName)
    .setOutputCol(s"${columnName}__cv")
    .setMinDF(10)
    .setMinTF(1)

  Array(countVectorizerModel)
})

val textTransformers = textColumnNames.flatMap((columnName: String) => {
  val hanLPTokenizer = new HanLPTokenizer()
    .setInputCol(columnName)
    .setOutputCol(s"${columnName}__words")
    .setShouldRemoveStopWords(true)

  val stopWordsRemover = new StopWordsRemover()
    .setInputCol(s"${columnName}__words")
    .setOutputCol(s"${columnName}__filtered_words")
    .setStopWords(StopWordsRemover.loadDefaultStopWords("english"))
  val word2VecModelPath = s"${settings.dataDir}/${settings.today}/word2VecModel.parquet"
  val word2VecModel = Word2VecModel.load(word2VecModelPath)
    .setInputCol(s"${columnName}__filtered_words")
    .setOutputCol(s"${columnName}__w2v")

  Array(hanLPTokenizer, stopWordsRemover, word2VecModel)
})

val finalBooleanColumnNames = booleanColumnNames.toArray
val finalContinuousColumnNames = continuousColumnNames.toArray
val finalCategoricalColumnNames = categoricalColumnNames.map(columnName => s"${columnName}__ohe").toArray
val finalListColumnNames = listColumnNames.map(columnName => s"${columnName}__cv").toArray
val finalTextColumnNames = textColumnNames.map(columnName => s"${columnName}__w2v").toArray
val vectorAssembler = new SimpleVectorAssembler()
  .setInputCols(finalBooleanColumnNames ++ finalContinuousColumnNames ++ finalCategoricalColumnNames ++ finalListColumnNames ++ finalTextColumnNames)
  .setOutputCol("features")

val featureStages = mutable.ArrayBuffer.empty[PipelineStage]
featureStages += userRepoTransformer
featureStages += alsModel
featureStages ++= categoricalTransformers
featureStages ++= listTransformers
featureStages ++= textTransformers
featureStages += vectorAssembler

val featurePipeline = new Pipeline().setStages(featureStages.toArray)
val featurePipelineModel = featurePipeline.fit(profileStarringDF)

ref:
https://spark.apache.org/docs/latest/ml-pipeline.html
https://spark.apache.org/docs/latest/ml-features.html

Handle Imbalanced Data

因為我們要訓練一個 Binary Classification 二元分類模型,會同時需要 positive(正樣本)和 negative(負樣本)。但是我們的原始數據 rawStarringDS 都是正樣本,也就是說我們只有「用戶有對哪些 repo 打星的資料」(正樣本),卻沒有「用戶沒有對哪些 repo 打星的資料」(負樣本)。我們當然是可以用「所有用戶沒有打星的 repo 做為負樣本」,但是考慮到這種做法產生的負樣本的數量實在太大,而且也不太合理,因為那些用戶沒有打星的 repo 不見得是因為他不喜歡,可能只是因為他不知道有那個 repo 存在。

我們後來採用的做法是「用熱門但是用戶沒有打星的 repo 做為負樣本」,我們寫了一個 Spark Transformer 來做這件事:

import ws.vinta.albedo.transformers.NegativeBalancer

import scala.collection.mutable

val sc = spark.sparkContext

val popularReposDS = loadPopularRepoDF()
val popularRepos = popularReposDS
  .select($"repo_id".as[Int])
  .collect()
  .to[mutable.LinkedHashSet]
val bcPopularRepos = sc.broadcast(popularRepos)

val negativeBalancer = new NegativeBalancer(bcPopularRepos)
  .setUserCol("user_id")
  .setItemCol("repo_id")
  .setTimeCol("starred_at")
  .setLabelCol("starring")
  .setNegativeValue(0.0)
  .setNegativePositiveRatio(2.0)
val balancedStarringDF = negativeBalancer.transform(reducedStarringDF)

ref:
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/evaluators/RankingEvaluator.scala
http://www.kdnuggets.com/2017/06/7-techniques-handle-imbalanced-data.html

Split Data

直接使用 holdout 的方式,隨機分配不同的 row 到 training set 和 test set。其他的做法可能是根據時間來拆分,用以前的數據來預測之後的行為。

val profileBalancedStarringDF = balancedStarringDF
  .join(userProfileDF, Seq("user_id"))
  .join(repoProfileDF, Seq("repo_id"))

val tmpDF = featurePipelineModel.transform(profileBalancedStarringDF)
val keepColumnName = tmpDF.columns.filter((columnName: String) => {
  !columnName.endsWith("__idx") &&
  !columnName.endsWith("__ohe") &&
  !columnName.endsWith("__cv") &&
  !columnName.endsWith("__words") &&
  !columnName.endsWith("__filtered_words") &&
  !columnName.endsWith("__w2v")
})
val featuredBalancedStarringDF = tmpDF.select(keepColumnName.map(col): _*)

val Array(trainingFeaturedDF, testFeaturedDF) = featuredBalancedStarringDF.randomSplit(Array(0.9, 0.1))

Build the Model Pipeline

為了方便之後的擴充性,這裡也使用 Spark ML Pipeline 的寫法。Spark ML 的 LogisticRegression 可以額外設置一個 weightCol 來調整不同 row 的權重。

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline, PipelineStage}

import scala.collection.mutable

val weightSQL = """
SELECT *,
       1.0 AS default_weight,
       IF (starring = 1.0, 0.9, 0.1) AS positive_weight,
       IF (starring = 1.0 AND datediff(current_date(), starred_at) <= 365, 0.9, 0.1) AS recent_starred_weight
FROM __THIS__
""".stripMargin
val weightTransformer = new SQLTransformer()
  .setStatement(weightSQL)

val lr = new LogisticRegression()
  .setMaxIter(200)
  .setRegParam(0.7)
  .setElasticNetParam(0.0)
  .setStandardization(true)
  .setLabelCol("starring")
  .setFeaturesCol("standard_features")
  .setWeightCol("recent_starred_weight")

val modelStages = mutable.ArrayBuffer.empty[PipelineStage]
modelStages += weightTransformer
modelStages += lr

val modelPipeline = new Pipeline().setStages(modelStages.toArray)
val modelPipelineModel = modelPipeline.fit(trainingFeaturedDF)

ref:
https://spark.apache.org/docs/latest/ml-classification-regression.html

Evaluate the Model: Classification

因為 Logistic Regression 是二元分類模型,所以我們可以用 Spark ML 的 BinaryClassificationEvaluator 來評估結果。不過因為我們做的是推薦系統,真正在乎的是 Top N 的排序問題,所以這裡的 AUC 的數值參考一下就好。

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

val testRankedDF = modelPipelineModel.transform(testFeaturedDF)

val binaryClassificationEvaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setRawPredictionCol("rawPrediction")
  .setLabelCol("starring")

val classificationMetric = binaryClassificationEvaluator.evaluate(testRankedDF)
println(s"${binaryClassificationEvaluator.getMetricName} = $classificationMetric")
// areaUnderROC = 0.9450631491281277

ref:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
https://docs.databricks.com/spark/latest/mllib/binary-classification-mllib-pipelines.html

Generate Candidates

推薦系統的另外一個重要部分就是產生候選物品集,這裡我們使用以下幾種方式:

  • ALS: 協同過濾的推薦
  • Content-based: 基於內容的推薦
  • Popularity: 基於熱門的推薦

不過因為這篇文章的主題是排序和特徵工程的 Machine Learning Pipeline,所以產生候選物品集的部分就不多說了,有興趣的人可以直接看底下連結的 source code 或是這個系列的其他文章。

import ws.vinta.albedo.recommenders.ALSRecommender
import ws.vinta.albedo.recommenders.ContentRecommender
import ws.vinta.albedo.recommenders.PopularityRecommender

val topK = 30

val alsRecommender = new ALSRecommender()
  .setUserCol("user_id")
  .setItemCol("repo_id")
  .setTopK(topK)

val contentRecommender = new ContentRecommender()
  .setUserCol("user_id")
  .setItemCol("repo_id")
  .setTopK(topK)
  .setEnableEvaluationMode(true)

val popularityRecommender = new PopularityRecommender()
  .setUserCol("user_id")
  .setItemCol("repo_id")
  .setTopK(topK)

val recommenders = mutable.ArrayBuffer.empty[Recommender]
recommenders += alsRecommender
recommenders += contentRecommender
recommenders += popularityRecommender

val candidateDF = recommenders
  .map((recommender: Recommender) => recommender.recommendForUsers(testUserDF))
  .reduce(_ union _)
  .select($"user_id", $"repo_id")
  .distinct()

// 每個 Recommender 的結果類似這樣:
// +-------+-------+----------+------+
// |user_id|repo_id|score     |source|
// +-------+-------+----------+------+
// |652070 |1239728|0.6731846 |als   |
// |652070 |854078 |0.7187486 |als   |
// |652070 |1502338|0.70165294|als   |
// |652070 |1184678|0.7434903 |als   |
// |652070 |547708 |0.7956538 |als   |
// +-------+-------+----------+------+

ref:
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/recommenders/ALSRecommender.scala
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/recommenders/ContentRecommender.scala
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/recommenders/PopularityRecommender.scala

Predict the Ranking

把這些候選物品集丟給我們訓練好的 Logistic Regression 模型來排序。結果中的 probability 欄位的第 0 項表示結果為 0 的機率(negative)、第 1 項表示結果為 1 的機率(positive)。

val profileCandidateDF = candidateDF
  .join(userProfileDF, Seq("user_id"))
  .join(repoProfileDF, Seq("repo_id"))

val featuredCandidateDF = featurePipelineModel
  .transform(profileCandidateDF)

val rankedCandidateDF = modelPipelineModel
  .transform(featuredCandidateDF)

// rankedCandidateDF 的結果類似這樣:
// +-------+--------+----------+----------------------------------------+
// |user_id|repo_id |prediction|probability                             |
// +-------+--------+----------+----------------------------------------+
// |652070 |83467664|1.0       |[0.12711894229094317,0.8728810577090568]|
// |652070 |55099616|1.0       |[0.1422859437320775,0.8577140562679224] |
// |652070 |42266235|1.0       |[0.1462014853157966,0.8537985146842034] |
// |652070 |78012800|1.0       |[0.15576081067098502,0.844239189329015] |
// |652070 |5928761 |1.0       |[0.16149848941925066,0.8385015105807493]|
// +-------+--------+----------+----------------------------------------+

ref:
https://stackoverflow.com/questions/37903288/what-do-colum-rawprediction-and-probability-of-dataframe-mean-in-spark-mllib

Evaluate the Model: Ranking

最後我們使用 Information Retrieval 領域中用來評價排序能力的指標 NDCG (Normalized Discounted Cumulative Gain) 來評估排序的結果。Spark MLlib 有現成的 RankingMetrics 可以用,但是它只適用於 RDD-based 的 API,所以我們改寫成適合 DataFrame-based 的 Evaluator

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import ws.vinta.albedo.evaluators.RankingEvaluator

val userActualItemsDF = reducedStarringDF
  .withColumn("rank", rank().over(Window.partitionBy($"user_id").orderBy($"starred_at".desc)))
  .where($"rank" <= topK)
  .groupBy($"user_id")
  .agg(collect_list($"repo_id").alias("items"))

val userPredictedItemsDF = rankedCandidateDF
  .withColumn("rank", rank().over(Window.partitionBy($"user_id").orderBy(toArrayUDF($"probability").getItem(1).desc)))
  .where($"rank" <= topK)
  .groupBy($"user_id")
  .agg(collect_list($"repo_id").alias("items"))

val rankingEvaluator = new RankingEvaluator(userActualItemsDF)
  .setMetricName("[email protected]")
  .setK(topK)
  .setUserCol("user_id")
  .setItemsCol("items")
val rankingMetric = rankingEvaluator.evaluate(userPredictedItemsDF)
println(s"${rankingEvaluator.getFormattedMetricName} = $rankingMetric")
// [email protected] = 0.021114356461615493

ref:
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/evaluators/RankingEvaluator.scala
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems
https://weekly.codetengu.com/issues/83#kOxuVxW

碼天狗週刊 第 100 期 @vinta - Apache Spark, Scala, Machine Learning, Feature Engineering, MySQL

碼天狗週刊 第 100 期 @vinta - Apache Spark, Scala, Machine Learning, Feature Engineering, MySQL

本文同步發表於 CodeTengu Weekly - Issue 100

Big Data Analysis with Scala and Spark

因為前陣子辭職了(想放個長假吶~),突然多了不少時間,所以決定在打電動之餘,花點時間上幾門 Coursera 的課。然後花了一個禮拜終於完成這門課啦!主題是 Scala 和 Spark,是 Functional Programming in Scala 系列課程的最後一門課(這個系列有一門課的老師就是 Scala 的發明者 Martin Odersky)。之前學東西都習慣看書,這次第一次在 Coursera 完整地上完一門課,老實說是個很棒的體驗啊。尤其是在寫第一個程式作業的時候,上面標註說大概要花 3 小時,結果我寫了一個下午哈哈哈。雖然上手之後,後來的作業其實很快就做完了。但是還是忍不住想抱怨一下:Spark 的 Dataset typed API 寫起來也太麻煩,而且效能還沒有比較好。

題外話,雖然還沒那麼快要開始找工作,不過因為很閒,這陣子新認識了不少工程師同業,交流了很多技術經驗(和業界八卦 XD),感覺挺不錯的啊,所以想說如果各位朋友或公司有興趣,歡迎聯絡我,咱們可以約個時間吃個飯 👍

題外話之二,說到打電動,跟大家分享一下,12 月的時候 PC/PS4/Xbox One 會出「大神 絕景版」,這款遊戲可是 PS2 時代不朽的名作之一啊,如果你還沒有玩過,拜託玩一下。然!後!登登!小島秀夫的傑作之一 Anubis: Z.O.E. 也要重製啦!

Rules of Machine Learning: Best Practices for ML Engineering

不得了啊,這份文件,有在搞機器學習的人,這禮拜讀這篇就夠啦。這份文件的作者是 Google 的 Research Scientist,歸納了 43 條搭建一個 Machine Learning 系統的最佳實踐,而且很多都是從實務和軟體工程角度的經驗總結,這種知識特別寶貴啊。老實說我覺得這也是軟體工程師在這一波 AI 浪潮中可以施力的點,因為任何的機器學習系統或產品,宏觀一點來看,它就是一個軟體工程問題。

Most of the problems you will face are, in fact, engineering problems.

延伸閱讀(RecsysChina 的前輩對這篇文章的評註):

Feature Engineering - Getting most out of data for predictive models

這份簡報很豐富,除了幾乎把我前陣子才讀完的 Mastering Feature Engineering 書裡的內容都講完了之外,也提到很多特別的作法。而且看到最後一頁才發現,原來簡報的作者也讀過這本書啊。

是說我這一陣子實際搗鼓了一番 Feature Engineering 之後的感想,特徵工程根本就是手藝活,講求的其實是創造力啊。

延伸閱讀:

What happens when your application cannot open yet another connection to MySQL

之前工作的時候,要把 Spark 運算完的推薦系統的結果寫回 MySQL 資料庫,結果卻遇到了 ERROR 2003 (HY000): Can't connect to MySQL server on '192.168.x.x' (99) 的錯誤,很多人應該都見過類似的錯誤訊息,不過括號裡的數字可能不一樣。在這個例子中,單看文字的描述很容易讓人誤會是 MySQL server 出了什麼差錯,但是其實關鍵在於最後那個括號裡的 OS error code。這篇文章把造成這個錯誤的前因後果說得非常清楚,值得一讀。

簡單說,用 perror 99 可以查到這個 error code 是 Cannot assign requested address 的意思,原來是因為我在寫 Spark 的時候太放肆,不小心在短時間內開了太多的 connection,把 local 的 port 用完了(需要 TIME_WAIT 冷卻時間),所以沒有辦法分配 port 給新的 MySQL connection。解決的辦法:在程式裡重用 MySQL connection 或是限制一下 concurrent 數,再不然就是修改 net.ipv4.tcp_tw_reuse = 1 系統設定。

忍不住提一下,其實 master/slave 架構的 MySQL 還是很罩啊,當初ㄙㄨㄚˋ地一下在幾分鐘內寫了四十幾億筆資料進去,MySQL 跟沒事一樣。原本還ㄏㄧㄠˊ咖稱想用 Cassandra,結果發現在現階段用 MySQL 就頂得住啦。

延伸閱讀:

Spark ML cookbook (Scala)

Spark ML cookbook (Scala)

Scala is the first class citizen language for interacting with Apache Spark, but it's difficult to learn. This article is mostly about Spark ML - the new Spark Machine Learning library which was rewritten in DataFrame-based API.

Convert a String Categorical Feature into Numeric One

StringIndexer converts labels (categorical values) into numbers (0.0, 1.0, 2.0 and so on) which ordered by label frequencies, the most frequnet label gets 0. This method is able to handle unseen labels with optional strategies.

StringIndexer's inputCol accepts string, numeric and boolean types.

val df1 = spark.createDataFrame(Seq(
    (1, "Python"),
    (2, "C++"),
    (3, "C++"),
    (4, "JavaScript"),
    (5, "Python"),
    (6, "Python"),
    (7, "Go")
BinaryClassificationEvaluator
)).toDF("repo_id", "repo_language")

val df2 = spark.createDataFrame(Seq(
    (1, "Python"),
    (2, "C++"),
    (3, "C++"),
    (4, "JavaScript"),
    (5, "Python"),
    (6, "Python"),
    (7, "Go"),
    (8, "JavaScript"),
    (9, "Brainfuck"),
    (10, "Brainfuck"),
    (11, "Red")
)).toDF("repo_id", "repo_language")

import org.apache.spark.ml.feature.StringIndexer

val stringIndexer = new StringIndexer()
  .setInputCol("repo_language")
  .setOutputCol("repo_language_index")
  .setHandleInvalid("keep")
val stringIndexerModel = stringIndexer.fit(df1)

stringIndexerModel.labels
// Array[String] = Array(Python, C++, JavaScript, Go)

val indexedDF = stringIndexerModel.transform(df2)
indexedDF.show()
// +-------+-------------+-------------------+
// |repo_id|repo_language|repo_language_index|
// +-------+-------------+-------------------+
// |      1|       Python|                0.0|
// |      2|          C++|                1.0|
// |      3|          C++|                1.0|
// |      4|   JavaScript|                3.0|
// |      5|       Python|                0.0|
// |      6|       Python|                0.0|
// |      7|           Go|                2.0|
// |      8|   JavaScript|                3.0|
// |      9|    Brainfuck|                4.0| <- previously unseen
// |     10|    Brainfuck|                4.0| <- previously unseen
// |     11|          Red|                4.0| <- previously unseen
// +-------+-------------+-------------------+

ref:
https://spark.apache.org/docs/latest/ml-features.html#stringindexer
https://stackoverflow.com/questions/34681534/spark-ml-stringindexer-handling-unseen-labels
https://stackoverflow.com/questions/32277576/how-to-handle-categorical-features-with-spark-ml/32278617

Convert an Indexed Numeric Feature Back to the Original Categorical One

import org.apache.spark.ml.feature.IndexToString

val indexToString = new IndexToString()
  .setInputCol("repo_language_index")
  .setOutputCol("repo_language_ori")

val oriIndexedDF = indexToString.transform(indexedDF)
oriIndexedDF.show()
// +-------+-------------+-------------------+----------------------+
// |repo_id|repo_language|repo_language_index|     repo_language_ori|
// +-------+-------------+-------------------+----------------------+
// |      1|       Python|                0.0|                Python|
// |      2|          C++|                1.0|                   C++|
// |      3|          C++|                1.0|                   C++|
// |      4|   JavaScript|                2.0|            JavaScript|
// |      5|       Python|                0.0|                Python|
// |      6|       Python|                0.0|                Python|
// |      7|           Go|                3.0|                    Go|
// |      8|   JavaScript|                2.0|            JavaScript|
// |      9|    Brainfuck|                4.0|             __unknown| <- previously unseen
// |     10|    Brainfuck|                4.0|             __unknown| <- previously unseen
// |     11|          Red|                4.0|             __unknown| <- previously unseen
// +-------+-------------+-------------------+----------------------+

ref:
https://spark.apache.org/docs/latest/ml-features.html#indextostring

One-hot Encoding for Categorical Features

OneHotEncoder's input column only accepts numeric types. If you have string columns, you need to use StringIndexer to transform them into doubles, bessides, StringIndexer is able to properly deal with unseen values. In my humble opinion, you should always apply StringIndexer before OneHotEncoder.

Be careful that OneHotEncoder's vector length will be the maximun value in the column, you must apply OneHotEncoder on the union dataset of both training set and test set. Since OneHotEncoder does not accept empty string for name, you need to replace all empty strings with a placeholder, something like __empty.

import org.apache.spark.ml.feature.OneHotEncoder

val knownDF = spark.createDataFrame(Seq(
  (2, "b"),
  (3, "c"),
  (0, "x"),
  (6, "c"),
  (4, "a"),
  (1, "a"),
  (5, "a")
)).toDF("category_1", "category_2")

val unseenDF = spark.createDataFrame(Seq(
  (123, "e"),
  (6, "c"),
  (2, "b"),
  (456, "c"),
  (1, "a")
)).toDF("category_1", "category_2")

val knownOneHotDF = new OneHotEncoder()
  .setDropLast(true)
  .setInputCol("category_1")
  .setOutputCol("category_1_one_hot")
  .transform(knownDF)
knownOneHotDF.show()
// +----------+----------+------------------+
// |category_1|category_2|category_1_one_hot|
// +----------+----------+------------------+
// |         2|         b|     (6,[2],[1.0])|
// |         3|         c|     (6,[3],[1.0])|
// |         0|         x|     (6,[0],[1.0])|
// |         6|         c|         (6,[],[])|
// |         4|         a|     (6,[4],[1.0])|
// |         1|         a|     (6,[1],[1.0])|
// |         5|         a|     (6,[5],[1.0])|
// +----------+----------+------------------+

val unseenOneHotDF = new OneHotEncoder()
  .setDropLast(true)
  .setInputCol("category_1")
  .setOutputCol("category_1_one_hot")
  .transform(unseenDF)
unseenOneHotDF.show()
// +----------+----------+------------------+
// |category_1|category_2|category_1_one_hot|
// +----------+----------+------------------+
// |       123|         e| (456,[123],[1.0])|
// |         6|         c|   (456,[6],[1.0])|
// |         2|         b|   (456,[2],[1.0])|
// |       456|         c|       (456,[],[])|
// |         1|         a|   (456,[1],[1.0])|
// +----------+----------+------------------+

ref:
https://spark.apache.org/docs/latest/ml-features.html#onehotencoder
https://stackoverflow.com/questions/32277576/how-to-handle-categorical-features-with-spark-ml/40615508
https://stackoverflow.com/questions/33089781/spark-dataframe-handing-empty-string-in-onehotencoder

Create a Regular Expression Tokenizer

setGaps(true) 時的 pattern 是 match 分隔符;setGaps(false) 時的 pattern 則是 match 字。

import org.apache.spark.ml.feature.RegexTokenizer
import org.apache.spark.sql.functions._

val sentenceDF = spark.createDataFrame(Seq(
  (1, "Hi, I heard about Spark"),
  (2, "I wish Java could use case classes."),
  (3, "Deep,Learning,models,are,state-of-the-art"),
  (4, "fuck_yeah!!! No.")
)).toDF("id", "sentence")

val countTokensUDF = udf((words: Seq[String]) => words.length)

val regexTokenizer = new RegexTokenizer()
  .setInputCol("sentence")
  .setOutputCol("words")
  .setPattern("""[\w\-_]+""").setGaps(false)
  // .setPattern("""\W""").setGaps(true)
  // .setPattern("""[,. ]""").setGaps(true)
val tokenizedDF = regexTokenizer.transform(sentenceDF)

val df = tokenizedDF
  .select("sentence", "words")
  .withColumn("count", countTokensUDF($"words"))
// +-----------------------------------------+-----------------------------------------------+-----+
// |sentence                                 |words                                          |count|
// +-----------------------------------------+-----------------------------------------------+-----+
// |Hi, I heard about Spark                  |[hi, i, heard, about, spark]                   |5    |
// |I wish Java could use case classes.      |[i, wish, java, could, use, case, classes]     |7    |
// |Deep,Learning,models,are,state-of-the-art|[deep, learning, models, are, state-of-the-art]|5    |
// |fuck_yeah!!! No.                         |[fuck_yeah, no]                                |2    |
// +-----------------------------------------+-----------------------------------------------+-----+

ref:
https://spark.apache.org/docs/latest/ml-features.html#tokenizer

Handle Comma-seperated Categorical Column

You could use RegexTokenizer, CountVectorizer or HashingTF.

import org.apache.spark.ml.feature.{RegexTokenizer, CountVectorizer}

val df = spark.createDataFrame(Seq(
  (1, "Action,Sci-Fi"),
  (2, "Sci-Fi,Romance,Horror"),
  (3, "War,Horror")
)).toDF("movie_id", "genres")

val regexTokenizer = new RegexTokenizer()
  .setInputCol("genres")
  .setOutputCol("genres_words")
  .setPattern("""[\w\-_]+""").setGaps(false)
val wordsDF = regexTokenizer.transform(df)

val countVectorizerModel = new CountVectorizer()
  .setInputCol("genres_words")
  .setOutputCol("genres_vector")
  .setMinDF(1) // for whole corpus, delete any term that appears less then n times
  .setMinTF(1) // for each document, delete any term that appears less then n times
  .fit(wordsDF)
val countVectorDF = countModel.transform(wordsDF)

// HashingTF might suffer from potential hash collisions
// it's good to use a power of two
val hashingTF = new HashingTF()
  .setInputCol("genres_words")
  .setOutputCol("genres_htf_vector")
  .setNumFeatures(4)
val htfVectorDF = hashingTF.transform(countVectorDF)

htfVectorDF.show(false)
// +--------+---------------------+-------------------------+-------------------------+-------------------+
// |movie_id|genres               |genres_words             |genres_count_vector      |genres_htf_vector  |
// +--------+---------------------+-------------------------+-------------------------+-------------------+
// |1       |Action,Sci-Fi        |[action, sci-fi]         |(5,[0,3],[1.0,1.0])      |(4,[0],[2.0])      |
// |2       |Sci-Fi,Romance,Horror|[sci-fi, romance, horror]|(5,[0,1,4],[1.0,1.0,1.0])|(4,[0,2],[2.0,1.0])|
// |3       |War,Horror           |[war, horror]            |(5,[1,2],[1.0,1.0])      |(4,[0,2],[1.0,1.0])|
// +--------+---------------------+-------------------------+-------------------------+-------------------+

countModel.vocabulary
// Array(sci-fi, horror, action, romance, war)

ref:
https://spark.apache.org/docs/latest/ml-features.html#countvectorizer
https://spark.apache.org/docs/latest/ml-features.html#tf-idf

Train a Word2Vec Model

The output vector of any Word2Vec model is dense!

import org.apache.spark.ml.feature.Word2Vec

val df = spark.createDataFrame(Seq(
  (1, "Hi I heard about Apache Spark".toLowerCase().split(" ")),
  (2, "I wish Java could use case classes".toLowerCase().split(" ")),
  (3, "Logistic regression models are neat".toLowerCase().split(" ")),
  (4, "Apache Spark with Scala is awesome".toLowerCase().split(" ")),
  (5, Array("中文", "嘛ㄟ通", "but", "必須", "另外", "分詞"))
)).toDF("id", "words")

val word2Vec = new Word2Vec()
  .setInputCol("words")
  .setOutputCol("words_w2v")
  .setMaxIter(10)
  .setVectorSize(3)
  .setWindowSize(5)
  .setMinCount(1)
val word2VecModel = word2Vec.fit(df)

word2VecModel.transform(df)
// +---+------------------------------------------+----------------------------------------------------------+
// |id |words                                     |words_w2v                                                 |
// +---+------------------------------------------+----------------------------------------------------------+
// |1  |[hi, i, heard, about, apache, spark]      |[-0.02013699459393,-0.02995631482274,0.047685102870066956]|
// |2  |[i, wish, java, could, use, case, classes]|[-0.05012317272186,0.01141336891094,-0.03742781743806387] |
// |3  |[logistic, regression, models, are, neat] |[-0.04678827972413,0.032994424477,0.0010566591750830413]  |
// |4  |[apache, spark, with, scala, is, awesome] |[0.0265524153169,0.02056275321716,0.013326843579610188]   |
// |5  |[中文, 嘛ㄟ通, but, 必須, 另外, 分詞]         |[0.0571783996973,-0.02301329133545,0.013507421438892681]  |
// +---+------------------------------------------+----------------------------------------------------------+

val df2 = spark.createDataFrame(Seq(
  (6, Array("not-in-vocabularies", "neither", "no")),
  (7, Array("spark", "not-in-vocabularies")),
  (8, Array("not-in-vocabularies", "spark")),
  (9, Array("no", "not-in-vocabularies", "spark")),
  (10, Array("中文", "spark"))
)).toDF("id", "words")

word2VecModel.transform(df2)
// the order of words doesn't mater
// +---+-------------------------------------+-----------------------------------------------------------------+
// |id |words                                |words_w2v                                                        |
// +---+-------------------------------------+-----------------------------------------------------------------+
// |6  |[not-in-vocabularies, neither, no]   |[0.0,0.0,0.0]                                                    |
// |7  |[spark, hell_no, not-in-vocabularies]|[0.0027440187210838,-0.0529780387878418,0.05730373660723368]     |
// |8  |[hell_no, not-in-vocabularies, spark]|[0.0027440187210838,-0.0529780387878418,0.05730373660723368]     |
// |9  |[not-in-vocabularies, hell_no, spark]|[0.0027440187210838,-0.0529780387878418,0.05730373660723368]     |
// |10 |[no, not-in-vocabularies, spark]     |[0.0027440187210838,-0.0529780387878418,0.05730373660723368]     |
// |11 |[中文, spark]                         |[-0.009499748703092337,-0.018227852880954742,0.13357853144407272]|
// +---+-------------------------------------+-----------------------------------------------------------------+

anotherWord2VecModel.findSynonyms("developer", 5)
// +-----------+------------------+
// |       word|        similarity|
// +-----------+------------------+
// |        dev| 0.881394624710083|
// |development|0.7730562090873718|
// |       oier|0.6866029500961304|
// |  develover|0.6720684766769409|
// |     webdev|0.6582568883895874|
// +-----------+------------------+

ref:
https://spark.apache.org/docs/latest/ml-features.html#word2vec

Calculate the Pearson Correlation between Features

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Matrix
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.Row

val featureNames = Array("stargazers_count", "forks_count", "subscribers_count")
val vectorAssembler = new VectorAssembler()
  .setInputCols(featureNames)
  .setOutputCol("features")

val df = vectorAssembler.transform(rawRepoInfoDS)
val correlationDF = Correlation.corr(df, "features")
val Row(coeff: Matrix) = correlationDF.head

println(featureNames.mkString(", "))
println(coeff.toString)
// stargazers_count, forks_count, subscribers_count
// 1.0                 0.5336901230713282  0.7664204175159971  
// 0.5336901230713282  1.0                 0.5414244966152617  
// 0.7664204175159971  0.5414244966152617  1.0

ref:
https://spark.apache.org/docs/latest/ml-statistics.html

DIMSUM

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

val repoWordRDD = repoVectorDF
  .select($"repo_id", $"text_w2v")
  .rdd
  .flatMap((row: Row) => {
    val repoId = row.getInt(0)
    val vector = row.getAs[DenseVector](1)
    vector.toArray.zipWithIndex.map({
      case (element, index) => MatrixEntry(repoId, index, element)
    })
  })
val repoWordMatrix = new CoordinateMatrix(repoWordRDD)
val wordRepoMatrix = repoWordMatrix.transpose

val repoSimilarityRDD = wordRepoMatrix
  .toRowMatrix
  .columnSimilarities(0.1)
  .entries
  .flatMap({
    case MatrixEntry(row: Long, col: Long, sim: Double) => {
      if (sim >= 0.5) {
        Array((row, col, sim))
      }
      else {
        None
      }
    }
  })
spark.createDataFrame(repoSimilarityRDD).toDF("item_1", "item_2", "similarity")
repoSimilarityDF.show(false)

ref:
https://stackoverflow.com/questions/42455725/columnsimilarities-back-to-spark-data-frame
https://forums.databricks.com/questions/248/when-should-i-use-rowmatrixcolumnsimilarities.html

Train a Locality Sensitive Hashing (LSH) Model: Bucketed Random Projection LSH

To specify the value of bucketLength, if input vectors are normalized, 1-10 times of pow(numRecords, -1/inputDim) would be a reasonable value. For instance, Math.pow(334913.0, -1.0 / 200.0) = 0.9383726472256705.

import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors

val userDF = spark.createDataFrame(Seq(
  (1, Vectors.sparse(6, Seq((0, -4.0), (1, 1.0), (2, 0.2)))),
  (2, Vectors.sparse(6, Seq((0, 5.5), (1, -0.6), (2, 9.0)))),
  (3, Vectors.sparse(6, Seq((1, 1.0), (2, 5.3), (4, 3.0)))),
  (4, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0)))),
  (5, Vectors.sparse(6, Seq((2, 1.0), (5, -0.2)))),
  (6, Vectors.sparse(6, Seq((0, 0.7)))),
  (7, Vectors.sparse(6, Seq((1, 0.3), (2, 1.0))))
)).toDF("user_id", "features")

val repoDF = spark.createDataFrame(Seq(
  (11, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0), (3, 1.0), (4, 1.0), (5, 1.0)))),
  (12, Vectors.sparse(6, Seq((0, 9.0), (1, -2.0), (2, -21.0), (3, 9.0), (4, 1.0), (5, 9.0)))),
  (13, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, -3.0), (3, 3.0), (4, 7.0), (5, 9.0)))),
  (14, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, -3.0)))),
  (15, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0))))
)).toDF("repo_id", "features")

val lsh = new BucketedRandomProjectionLSH()
  .setBucketLength(0.6812920690579612)
  .setNumHashTables(4)
  .setInputCol("features")
  .setOutputCol("hashes")
val lshModel = lsh.fit(repoDF)

val hashedUserDF = lshModel.transform(userDF)
val hashedRepoDF = lshModel.transform(repoDF)
hashedRepoDF.show(false)
// +-------+----------------------------------------------+--------------------------------+
// |repo_id|features                                      |hashes                          |
// +-------+----------------------------------------------+--------------------------------+
// |11     |(6,[0,1,2,3,4,5],[1.0,1.0,1.0,1.0,1.0,1.0])   |[[1.0], [-2.0], [-1.0], [-1.0]] |
// |12     |(6,[0,1,2,3,4,5],[9.0,-2.0,-21.0,9.0,1.0,9.0])|[[21.0], [-28.0], [18.0], [0.0]]|
// |13     |(6,[0,1,2,3,4,5],[1.0,1.0,-3.0,3.0,7.0,9.0])  |[[4.0], [-10.0], [6.0], [-3.0]] |
// |14     |(6,[0,1,2],[1.0,1.0,-3.0])                    |[[2.0], [-3.0], [2.0], [1.0]]   |
// |15     |(6,[1,2],[1.0,1.0])                           |[[-1.0], [0.0], [-2.0], [0.0]]  |
// +-------+----------------------------------------------+--------------------------------+

val similarDF = lshModel
  .approxSimilarityJoin(hashedUserDF, hashedRepoDF, 10.0, "distance")
  .select($"datasetA.user_id".alias("user_id"), $"datasetB.repo_id".alias("repo_id"), $"distance")
  .orderBy($"user_id", $"distance".asc)
similarDF.show(false)
// +-------+-------+------------------+
// |user_id|repo_id|distance          |
// +-------+-------+------------------+
// |1      |15     |4.079215610874228 |
// |3      |15     |5.243090691567332 |
// |4      |15     |1.0               |
// |4      |11     |1.7320508075688772|
// |5      |15     |1.019803902718557 |
// |5      |11     |2.33238075793812  |
// |6      |15     |1.57797338380595  |
// |7      |15     |0.7               |
// |7      |11     |2.118962010041709 |
// +-------+-------+------------------+

val userVector = Vectors.sparse(6, Seq((0, 1.5), (1, 0.8), (2, 2.0)))
val singleSimilarDF = lshModel
  .approxNearestNeighbors(hashedRepoDF, userVector, 5, "distance")
  .select($"repo_id", $"features", $"distance")
singleSimilarDF.show(false)
// +-------+----------------------------------------------+------------------+
// |repo_id|features                                      |distance          |
// +-------+----------------------------------------------+------------------+
// |15     |(6,[1,2],[1.0,1.0])                           |1.8138357147217055|
// |12     |(6,[0,1,2,3,4,5],[9.0,-2.0,-21.0,9.0,1.0,9.0])|27.49709075520536 |
// +-------+----------------------------------------------+------------------+

The problem of approxSimilarityJoin() is that you can't control the number of generated items, the disadvantage of approxNearestNeighbors() is that you have to manually iterate all users to find similar items. Moreover, both methods can easily suffer from the infamous java.lang.OutOfMemoryError.

ref:
https://spark.apache.org/docs/latest/ml-features.html#locality-sensitive-hashing

Train a Locality Sensitive Hashing (LSH) Model: MinHash LSH

MinHash LSH treats input as a binary vector, that is, all non-zero values (include negative values) are just 1. Basically, the Word2Vec vector won't be an appropriate input to MinHash LSH.

import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors

val userDF = spark.createDataFrame(Seq(
  (1, Vectors.sparse(6, Seq((0, -4.0), (1, 1.0), (2, 0.2)))),
  (2, Vectors.sparse(6, Seq((0, 5.5), (1, -0.6), (2, 9.0)))),
  (3, Vectors.sparse(6, Seq((1, 1.0), (2, 5.3), (4, 3.0)))),
  (4, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0)))),
  (5, Vectors.sparse(6, Seq((2, 1.0), (5, -0.2)))),
  (6, Vectors.sparse(6, Seq((2, 0.7)))),
  (7, Vectors.sparse(6, Seq((3, 0.3), (5, 1.0))))
)).toDF("user_id", "features")

val repoDF = spark.createDataFrame(Seq(
  (11, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
  (12, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
  (13, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("repo_id", "features")

val lsh = new MinHashLSH()
  .setNumHashTables(4)
  .setInputCol("features")
  .setOutputCol("hashes")

val lshModel = lsh.fit(userDF)
val hashedUserDF = lshModel.transform(userDF)
val hashedRepoDF = lshModel.transform(repoDF)

hashedUserDF.show(false)
// user 1 and 2 have the same hashed vector
// user 3 and 4 have the same hashed vector
// +-------+--------------------------+-----------------------------------------------------------------------+
// |user_id|features                  |hashes                                                                 |
// +-------+--------------------------+-----------------------------------------------------------------------+
// |1      |(6,[0,1,2],[-4.0,1.0,0.2])|[[-2.031299587E9], [-1.974869772E9], [-1.974047307E9], [4.95314097E8]] |
// |2      |(6,[0,1,2],[5.5,-0.6,9.0])|[[-2.031299587E9], [-1.974869772E9], [-1.974047307E9], [4.95314097E8]] |
// |3      |(6,[1,2,4],[1.0,5.3,3.0]) |[[-2.031299587E9], [-1.974869772E9], [-1.230128022E9], [8.7126731E8]]  |
// |4      |(6,[1,2,4],[1.0,1.0,1.0]) |[[-2.031299587E9], [-1.974869772E9], [-1.230128022E9], [8.7126731E8]]  |
// |5      |(6,[2,5],[1.0,-0.2])      |[[-2.031299587E9], [-1.758749518E9], [-4.86208737E8], [-1.919887134E9]]|
// |6      |(6,[2],[0.7])             |[[-2.031299587E9], [-1.758749518E9], [-4.86208737E8], [1.247220523E9]] |
// |7      |(6,[3,5],[0.3,1.0])       |[[-1.278435698E9], [-1.542629264E9], [2.57710548E8], [-1.919887134E9]] |
// +-------+--------------------------+-----------------------------------------------------------------------+

val userSimilarRepoDF = lshModel
  .approxSimilarityJoin(hashedUserDF, hashedRepoDF, 0.6, "distance")
  .select($"datasetA.user_id".alias("user_id"), $"datasetB.repo_id".alias("repo_id"), $"distance")
  .orderBy($"user_id", $"distance".asc)

userSimilarRepoDF.show(false)
// +-------+-------+-------------------+
// |user_id|repo_id|distance           |
// +-------+-------+-------------------+
// |1      |13     |0.5                |
// |2      |13     |0.5                |
// |3      |13     |0.0                |
// |4      |13     |0.0                |
// |5      |12     |0.33333333333333337|
// |7      |12     |0.33333333333333337|
// |7      |11     |0.33333333333333337|
// +-------+-------+-------------------+

ref:
https://databricks.com/blog/2017/05/09/detecting-abuse-scale-locality-sensitive-hashing-uber-engineering.html

Train a Logistic Regression Model

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.Vectors

val training = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(1.0, 2.5, 0.0, 0.0)),
  (1.0, Vectors.dense(0.1, 9.0, 0.0, 0.0)),
  (1.0, Vectors.dense(0.0, 0.0, 1.0, 0.0)),
  (0.0, Vectors.dense(0.0, 0.0, 2.0, 9.0)),
  (0.0, Vectors.dense(1.0, 0.0, 0.0, 5.0))
)).toDF("label", "features")

val lr = new LogisticRegression()
  .setMaxIter(100)
  .setRegParam(0.0)
  .setElasticNetParam(0.0)
  .setFamily("binomial")
  .setFeaturesCol("features")
  .setLabelCol("label")

lr.explainParams()

val lrModel = lr.fit(training)

println(s"Coefficients: ${lrModel.coefficients}")
// [2.0149015925419,2.694173163503675,9.547978766053463,-5.592221425156231]

println(s"Intercept: ${lrModel.intercept}")
// 8.552229795281482

val result = lrModel.transform(test)

ref:
https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression
https://spark.apache.org/docs/latest/mllib-linear-methods.html#logistic-regression

import org.apache.spark.ml.classification.BinaryLogisticRegressionSummary

val binarySummary = lrModel.summary.asInstanceOf[BinaryLogisticRegressionSummary]
println(s"Area Under ROC: ${binarySummary.areaUnderROC}")

ref:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.classification.BinaryLogisticRegressionTrainingSummary

Evaluate a Binary Classification Model

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.linalg.Vectors

val df = spark.createDataFrame(Seq(
  (Vectors.dense(0.0, 2.5), 1.0), // correct
  (Vectors.dense(1.0, 4.1), 1.0), // correct
  (Vectors.dense(9.2, 1.1), 0.0), // correct
  (Vectors.dense(1.0, 0.1), 0.0), // correct
  (Vectors.dense(5.0, 0.5), 1.0)  // incorrect
)).toDF("rawPrediction", "starring")

val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setRawPredictionCol("rawPrediction")
  .setLabelCol("starring")
val metric = evaluator.evaluate(df)
// 0.8333333333333333

ref:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

Train an ALS Model

import org.apache.spark.ml.recommendation.ALS

val df = spark.createDataFrame(Seq(
  (1, 1, 12),
  (1, 2, 90),
  (1, 4, 4),
  (2, 4, 1),
  (3, 5, 8)
)).toDF("user", "item", "rating")

val als = new ALS()
  .setImplicitPrefs(true)
  .setRank(5)
  .setRegParam(0.5)
  .setAlpha(40)
  .setMaxIter(10)
  .setSeed(42)
  .setColdStartStrategy("drop")
val alsModel = als.fit(df)

val predictionDF = alsModel.transform(df)
// +----+----+------+----------+
// |user|item|rating|prediction|
// +----+----+------+----------+
// |   1|   1|    12| 0.9988487|
// |   3|   5|     8| 0.9984464|
// |   1|   4|     4|0.99887615|
// |   2|   4|     1| 0.9921428|
// |   1|   2|    90| 0.9997897|
// +----+----+------+----------+

predictionDF.printSchema()
// root
 // |-- user: integer (nullable = false)
 // |-- item: integer (nullable = false)
 // |-- rating: integer (nullable = false)
// |-- prediction: float (nullable = false)

val userRecommendationsDF = alsModel.recommendForAllUsers(15)
// +----+-----------------------------------------------------------------+
// |user|recommendations                                                  |
// +----+-----------------------------------------------------------------+
// |1   |[[2,0.9997897], [4,0.9988761], [1,0.9988487], [5,0.0]]           |
// |3   |[[5,0.9984464], [1,2.9802322E-8], [2,0.0], [4,0.0]]              |
// |2   |[[4,0.9921428], [2,0.10759391], [1,0.10749264], [5,1.4901161E-8]]|
// +----+-----------------------------------------------------------------+

userRecommendationsDF.printSchema()
// root
 // |-- user: integer (nullable = false)
 // |-- recommendations: array (nullable = true)
 // |    |-- element: struct (containsNull = true)
 // |    |    |-- item: integer (nullable = true)
// |    |    |-- rating: float (nullable = true)

ref:
https://spark.apache.org/docs/latest/ml-collaborative-filtering.html

Save and Load an ALS Model

import org.apache.hadoop.mapred.InvalidInputException
import org.apache.spark.ml.recommendation.{ALS, ALSModel}

val alsModelSavePath = "./spark-data/20170902/alsModel.parquet"
val alsModel: ALSModel = try {
  ALSModel.load(alsModelSavePath)
} catch {
  case e: InvalidInputException => {
    if (e.getMessage().contains("Input path does not exist")) {
      val als = new ALS()
        .setImplicitPrefs(true)
        .setRank(100)
        .setRegParam(0.5)
        .setAlpha(40)
        .setMaxIter(22)
        .setSeed(42)
        .setColdStartStrategy("drop")
        .setUserCol("user_id")
        .setItemCol("repo_id")
        .setRatingCol("starring")
      val alsModel = als.fit(rawRepoStarringDS)
      alsModel.save(alsModelSavePath)
      alsModel
    } else {
      throw e
    }
  }
}

Create a Custom Transformer

package ws.vinta.albedo.transformers

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.{DoubleParam, Param, ParamMap}
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row}

import scala.collection.mutable

class NegativeBalancer(override val uid: String, val bcPopularItems: Broadcast[mutable.LinkedHashSet[Int]])
  extends Transformer with DefaultParamsWritable {

  def this(bcPopularItems: Broadcast[mutable.LinkedHashSet[Int]]) = {
    this(Identifiable.randomUID("negativeBalancer"), bcPopularItems)
  }

  val userCol = new Param[String](this, "userCol", "User 所在的欄位名稱")

  def getUserCol: String = $(userCol)

  def setUserCol(value: String): this.type = set(userCol, value)
  setDefault(userCol -> "user")

  val itemCol = new Param[String](this, "itemCol", "Item 所在的欄位名稱")

  def getItemCol: String = $(itemCol)

  def setItemCol(value: String): this.type = set(itemCol, value)
  setDefault(itemCol -> "item")

  val labelCol = new Param[String](this, "labelCol", "Label 所在的欄位名稱")

  def getLabelCol: String = $(labelCol)

  def setLabelCol(value: String): this.type = set(labelCol, value)
  setDefault(labelCol -> "label")

  val negativeValue = new DoubleParam(this, "negativeValue", "負樣本的值")

  def getNegativeValue: Double = $(negativeValue)

  def setNegativeValue(value: Double): this.type = set(negativeValue, value)
  setDefault(negativeValue -> 0.0)

  val negativePositiveRatio = new DoubleParam(this, "negativePositiveRatio", "負樣本與正樣本的比例")

  def getNegativePositiveRatio: Double = $(negativePositiveRatio)

  def setNegativePositiveRatio(value: Double): this.type = set(negativePositiveRatio, value)
  setDefault(negativePositiveRatio -> 1.0)

  override def transformSchema(schema: StructType): StructType = {
    Map($(userCol) -> IntegerType, $(itemCol) -> IntegerType, $(labelCol) -> DoubleType)
      .foreach{
        case(columnName: String, expectedDataType: DataType) => {
          val actualDataType = schema(columnName).dataType
          require(actualDataType.equals(IntegerType), s"Column $columnName must be of type $expectedDataType but was actually $actualDataType.")
        }
      }

    schema
  }

  override def transform(dataset: Dataset[_]): DataFrame = {
    transformSchema(dataset.schema)

    val popularItems: mutable.LinkedHashSet[Int] = this.bcPopularItems.value

    val emptyItemSet = new mutable.HashSet[Int]
    val addToItemSet = (itemSet: mutable.HashSet[Int], item: Int) => itemSet += item
    val mergeItemSets = (set1: mutable.HashSet[Int], set2: mutable.HashSet[Int]) => set1 ++= set2

    val getUserNegativeItems = (userItemsPair: (Int, mutable.HashSet[Int])) => {
      val (user, positiveItems) = userItemsPair
      val negativeItems = popularItems.diff(positiveItems)
      val requiredNegativeItemsCount = (positiveItems.size * this.getNegativePositiveRatio).toInt
      (user, negativeItems.slice(0, requiredNegativeItemsCount))
    }
    val expandNegativeItems = (userItemsPair: (Int, mutable.LinkedHashSet[Int])) => {
      val (user, negativeItems) = userItemsPair
      negativeItems.map({(user, _, $(negativeValue))})
    }

    import dataset.sparkSession.implicits._

    // TODO: 目前是假設傳進來的 dataset 都是 positive samples,之後可能得處理含有 negative samples 的情況
    val negativeDF = dataset
      .select($(userCol), $(itemCol))
      .rdd
      .map({
        case Row(user: Int, item: Int) => (user, item)
      })
      .aggregateByKey(emptyItemSet)(addToItemSet, mergeItemSets)
      .map(getUserNegativeItems)
      .flatMap(expandNegativeItems)
      .toDF($(userCol), $(itemCol), $(labelCol))

    dataset.select($(userCol), $(itemCol), $(labelCol)).union(negativeDF)
  }

  override def copy(extra: ParamMap): this.type = {
    defaultCopy(extra)
  }
}

ref:
https://www.safaribooksonline.com/library/view/high-performance-spark/9781491943199/ch09.html#extending_spark_ml
https://stackoverflow.com/questions/40615713/how-to-write-a-custom-transformer-in-mllib
https://issues.apache.org/jira/browse/SPARK-17048

Create a Custom Evaluator

package ws.vinta.albedo.evaluators

import org.apache.spark.ml.evaluation.Evaluator
import org.apache.spark.ml.param.{Param, ParamMap}
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable}
import org.apache.spark.mllib.evaluation.RankingMetrics
import org.apache.spark.sql.{DataFrame, Dataset, Row}

class RankingEvaluator(override val uid: String, val userActualItemsDF: DataFrame)
  extends Evaluator with DefaultParamsWritable {

  def this(userActualItemsDF: DataFrame) = {
    this(Identifiable.randomUID("rankingEvaluator"), userActualItemsDF)
  }

  val metricName = new Param[String](this, "metricName", "評估方式")

  def getMetricName: String = $(metricName)

  def setMetricName(value: String): this.type = set(metricName, value)
  setDefault(metricName -> "[email protected]")

  val k = new Param[Int](this, "k", "只評估前 k 個 items 的排序結果")

  def getK: Int = $(k)

  def setK(value: Int): this.type = set(k, value)
  setDefault(k -> 15)

  override def isLargerBetter: Boolean = $(metricName) match {
    case "map" => true
    case "[email protected]" => true
    case "[email protected]" => true
  }

  override def evaluate(dataset: Dataset[_]): Double = {
    import dataset.sparkSession.implicits._

    val userPredictedItemsDF = dataset.select($"user_id", $"recommendations.repo_id".alias("items"))

    val bothItemsRDD = userPredictedItemsDF.join(userActualItemsDF, Seq("user_id", "user_id"))
      .select(userPredictedItemsDF.col("items"), userActualItemsDF.col("items"))
      .rdd
      .map((row: Row) => {
        // Row(userPredictedItems, userActualItems)
        (row(0).asInstanceOf[Seq[Int]].toArray, row(1).asInstanceOf[Seq[Int]].toArray)
      })

    val rankingMetrics = new RankingMetrics(bothItemsRDD)
    val metric = $(metricName) match {
      case "map" => rankingMetrics.meanAveragePrecision
      case "[email protected]" => rankingMetrics.ndcgAt($(k))
      case "[email protected]" => rankingMetrics.precisionAt($(k))
    }
    metric
  }

  override def copy(extra: ParamMap): RankingEvaluator = {
    defaultCopy(extra)
  }
}

ref:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems
https://www.safaribooksonline.com/library/view/spark-the-definitive/9781491912201/ch19.html#s6c5---recommendation

Apply Transformer on Multiple Columns

import org.apache.spark.ml.feature._

val userCategoricalColumnNames = Array("account_type", "clean_company", "clean_email", "clean_location")
val userCategoricalTransformers = userCategoricalColumnNames.flatMap((columnName: String) => {
  val stringIndexer = new StringIndexer()
    .setInputCol(columnName)
    .setOutputCol(s"${columnName}_index")
    .setHandleInvalid("keep")
  val oneHotEncoder = new OneHotEncoder()
    .setInputCol(s"${columnName}_index")
    .setOutputCol(s"${columnName}_ohe")
    .setDropLast(true)
  Array(stringIndexer, oneHotEncoder)
})
userCategoricalTransformers.foreach(println)
// strIdx_4029f57e379a
// oneHot_f0decb92a05c
// strIdx_fb855ad6caaa
// oneHot_f1be19344002
// strIdx_7fa62a683293
// oneHot_097ae442d8fc
// strIdx_0ff7ffa022a1
// oneHot_4a9f72a7f5d8

ref:
https://stackoverflow.com/questions/34167105/using-spark-mls-onehotencoder-on-multiple-columns

Cross-validate a Pipeline Model

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

val vectorAssembler = new VectorAssembler()
  .setInputCols(Array("feature1", "feature2", "feature3"))
  .setOutputCol("features")

val lr = new LogisticRegression()
  .setFeaturesCol("features")
  .setLabelCol("starring")

val pipeline = new Pipeline()
  .setStages(Array(vectorAssembler, lr))

val paramGrid = new ParamGridBuilder()
  .addGrid(lr.maxIter, Array(20, 100))
  .addGrid(lr.regParam, Array(0.0, 0.5, 1.0, 2.0))
  .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
  .build()

val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setRawPredictionCol("rawPrediction")
  .setLabelCol("starring")

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEstimatorParamMaps(paramGrid)
  .setEvaluator(evaluator)
  .setNumFolds(3)

val cvModel = cv.fit(trainingDF)

ref:
https://spark.apache.org/docs/latest/ml-tuning.html#cross-validation

Extract Best Parameters from a Cross-validation Model

import org.apache.spark.ml.PipelineModel
import org.apache.spark.ml.classification.LogisticRegressionModel

val bestPipelineModel = cvModel.bestModel.asInstanceOf[PipelineModel]
val lrModel = bestPipelineModel.stages(0).asInstanceOf[LogisticRegressionModel]
lrModel.extractParamMap()
// or
lrModel.explainParams()

ref:
https://stackoverflow.com/questions/31749593/how-to-extract-best-parameters-from-a-crossvalidatormodel

Show All Parameters of a Cross-validation Model

import org.apache.spark.ml.param.ParamMap

cvModel.getEstimatorParamMaps
  .zip(cvModel.avgMetrics)
  .sortWith(_._2 > _._2)
  .foreach((pair: (ParamMap, Double)) => {
    println(s"${pair._2}: ${pair._1}")
  })
// 0.8999999999999999: {
//     hashingTF_ac8be8d5806b-numFeatures: 1000,
//     logreg_9f79de6e51ec-regParam: 0.1
// }
// 0.8875: {
//     hashingTF_ac8be8d5806b-numFeatures: 100,
//     logreg_9f79de6e51ec-regParam: 0.1
// }
// 0.875: {
//     hashingTF_ac8be8d5806b-numFeatures: 100,
//     logreg_9f79de6e51ec-regParam: 0.01
// }

ref:
https://stackoverflow.com/questions/31749593/how-to-extract-best-parameters-from-a-crossvalidatormodel
https://alvinalexander.com/scala/how-sort-scala-sequences-seq-list-array-buffer-vector-ordering-ordered

Feature Engineering 特徵工程中常見的方法

Feature Engineering 特徵工程中常見的方法

Feature Engineering 是把 raw data 轉換成 features 的整個過程的總稱。基本上特徵工程就是個手藝活,講求的是創造力。

本文不定期更新中。

Missing Value Imputation

最簡單暴力的做法當然就是直接 drop 掉那些含有缺失值的 rows。

針對 numerical 特徵的缺失值,可以用以下方式取代:

  • 0,缺點是可能會混淆其他本來就是 0 的數值
  • -999,用某個正常情況下不會出現的數值代替,但是選得不好可能會變成異常值,要特別對待
  • Mean,平均數
  • Median,中位數,跟平均數相比,不會被異常值干擾

針對 categorical 特徵的缺失值,可以用以下方式取代:

  • Mode,眾數,最常見的值
  • 改成 "Others" 之類的值

假設你要填補 age 這個特徵,然後你有其他例如 gender 這樣的特徵,你可以分別計算男性和女性的 age 的 mean、median 和 mode 來填補缺失值;更複雜一點的方式是,你可以把沒有缺失值的數據挑出來,用它們來訓練一個 regression 或 classification 模型,用這個模型來預測缺失值。

不過其實有些演算法是可以容許缺失值的,這時候可以新增一個 has_missing_value 欄位(稱為 NA indicator column)。

ref:
http://adataanalyst.com/machine-learning/comprehensive-guide-feature-engineering/
https://stats.stackexchange.com/questions/28860/why-adding-an-na-indicator-column-instead-of-value-imputation-for-randomforest

Outliers Detection

發現離群值最直觀的方式就是畫圖表,針對單一特徵可以使用 box plot;兩兩特徵則可以使用 scatter plot。

處置離群值的方式通常是直接刪除或是做變換(例如 log transformation 或 binning),當然你也可以套用處理缺失值的方式。

ref:
https://www.analyticsvidhya.com/blog/2016/01/guide-data-exploration/
https://www.douban.com/note/413022836/

Duplicate Entries Removal

duplicate 或 redundant 尤其指的是那些 features 都一樣,但是 target variable 卻不同的數據。

Feature Scaling 特徵縮放

Standardization 標準化

原始資料中,因為各個特徵的含義和單位不同,每個特徵的取值範圍可能會差異很大。例如某個二元特徵的範圍是 0 或 1,另一個價格特徵的範圍可能是 [0, 1000000],由於取值範圍相差過大導致了模型可能會更偏向於取值範圍較大的那個特徵。解決的辦法就是把各種不同 scale 的特徵轉換成同樣的 scale,稱為標準化或正規化。

狹義來說,標準化專門指的是透過計算 z-score,讓數據的 mean 為 0、 variance 為 1。

ref:
https://spark.apache.org/docs/latest/ml-features.html#standardscaler
http://scikit-learn.org/stable/modules/preprocessing.html#standardization-or-mean-removal-and-variance-scaling
https://www.quora.com/How-bad-is-it-to-standardize-dummy-variables

Normalization 歸一化、正規化

歸一化是指把每個樣本縮放到單位範數(每個樣本的範數為 1),適用於計算 dot product 或者兩個樣本之間的相似性。除了標準化、歸一化之外,其他還有透過最大、最小值,把數據的範圍縮放到 [0, 1] 或 [-1, 1] 的區間縮放法,不過這個方法容易受異常值的影響。

標準化是分別對單一特徵進行(針對 column);歸一化是對每個 observation 進行(針對 row)。

對 SVM、logistic regression 或其他使用 squared loss function 的演算法來說,需要 standardization;對 Vector Space Model 來說,需要 normalization;至於 tree-based 的演算法,基本上都不需要標準化或歸一化,它們對 scale 不敏感。

ref:
https://spark.apache.org/docs/latest/ml-features.html#normalizer
http://scikit-learn.org/stable/modules/preprocessing.html#normalization
https://www.qcloud.com/community/article/689521

Feature Transformation 特徵變換

以下適用 continuous 特徵:

Rounding

某些精度有到小數點後第 n 位的特徵,如果你其實不需要那麼精確,可以考慮 round(value * m)round(log(value)) 這樣的做法,甚至可以把 round 之後的數值當成 categorical 特徵。

confidence  round(confidence * 10)
0.9594      10
0.1254      1
0.1854      2
0.5454      5
0.3655      4

Log Transformation

因為 x 越大,log(x) 增長的速度就越慢,所以取 log 的意義是可以 compress 大數和 expand 小數,換句話說就是壓縮 "long tail" 和展開 "head"。假設 x 原本的範圍是 [100, 1000],log(x, 10) 之後的範圍就變成 [2, 3] 了。也常常使用 log(1 + x)log(x / (1 - x))

另外一種類似的做法是 square root 平方根或 cube root 立方根(可以用在負數)。

ref:
https://www.safaribooksonline.com/library/view/mastering-feature-engineering/9781491953235/ch02.html

Binarization 二值化

對數值型的數據設定一個 threshold,大於就賦值為 1、小於就賦值為 0。例如 score,如果你只關心「及格」或「不及格」,可以直接把成績對應到 1(score >= 60)和 0(score < 60)。或是你要做啤酒銷量分析,你可以新增一個 age >= 18 的特徵來標示出已成年。

你有一個 color 的 categorical 特徵,如果你不在乎實際上是什麼顏色的話,其實也可以改成 has_color

ref:
https://spark.apache.org/docs/latest/ml-features.html#binarizer

Binning

也稱為 bucketization。

age 這樣的特徵為例,你可以把所有年齡拆分成 n 段,0-20 歲、20-40 歲、40-60 歲等或是 0-18 歲、18-40 歲、40-70 歲等(等距或等量),然後把個別的年齡對應到某一段,假設 26 歲是對應到第二個 bucket,那新特徵的值就是 2。這種方式是人為地指定每個 bucket 的邊界值,還有另外一種拆分法是根據數據的分佈來拆,稱為 quantization 或 quantile binning,你只需要指定 bucket 的數量即可。

同樣的概念應用到其他地方,可以把 datetime 特徵拆分成上午、中午、下午和晚上;如果是 categorical 特徵,則可以先 SELECT count() ... GROUP BY,然後把出現次數小於某個 threshold 的值改成 "Other" 之類的。或者是你有一個 occupation 特徵,如果你其實不需要非常準確的職業資訊的話,可以把 "Web Developer"、"iOS Developer" 或 "DBA" 這些個別的資料都改成 "Software Engineer"。

binarization 和 binning 都是對 continuous 特徵做 discretization 離散化,增強模型的非線性泛化能力。

ref:
https://spark.apache.org/docs/latest/ml-features.html#bucketizer
https://spark.apache.org/docs/latest/ml-features.html#quantilediscretizer
https://github.com/collectivemedia/spark-ext#optimal-binning
https://www.qcloud.com/community/article/689521

以下適用 categorical 特徵:

ref:
https://en.wikipedia.org/wiki/Categorical_variable
https://www.safaribooksonline.com/library/view/introduction-to-machine/9781449369880/ch04.html

Integer Encoding

也稱為 label encoding。

把每個 category 對應到數字,一種做法是隨機對應到 0, 1, 2, 3, 4 等數字;另外一種做法是依照該值出現的頻率大小的順序來給值,例如最常出現的值給 0,依序給 1, 2, 3 等等。如果是針對一些在某種程度上有次序的 categorical 特徵(稱為 ordinal),例如「鑽石會員」「白金會員」「黃金會員」「普通會員」,直接 mapping 成數字可能沒什麼問題,但是如果是類似 colorcity 這樣的沒有明顯大小的特徵的話,還是用 one-hot encoding 比較合適。不過如果用的是 tree-based 的演算法就無所謂了。

有些 categorical 特徵也可能會用數字表示(例如 id),跟 continuous 特徵的差別是,數值的差異或大小對 categorical 特徵來說沒有太大的意義。

ref:
http://breezedeus.github.io/2014/11/15/breezedeus-feature-processing.html
http://phunters.lofter.com/post/86d56_194e956

One-hot Encoding (OHE)

如果某個特徵有 m 種值(例如 Taipei, Beijing, Tokyo),那它 one-hot encode 之後就會變成長度為 m 的向量:

city    city_Taipei city_Beijing city_tokyo
Taipei  1           0            0
Beijing 0           1            0
Tokyo   0           0            1

你也可以改用 Dummy coding,這樣就只需要產生長度為 m -1 的向量:

city    city_Taipei city_Beijing
Taipei  1           0
Beijing 0           1
Tokyo   0           0

OHE 的缺點是容易造成特徵的維度大幅增加和沒辦法處理之前沒見過的值。

ref:
http://scikit-learn.org/stable/modules/preprocessing.html#preprocessing-categorical-features
https://blog.myyellowroad.com/using-categorical-data-in-machine-learning-with-python-from-dummy-variables-to-deep-category-66041f734512

Bin-counting

例如在 Computational Advertising 中,如果你有針對每個 user 的「廣告曝光數(包含點擊和未點擊)」和「廣告點擊數」,你就可以算出每個 user 的「點擊率」,然後用這個機率來表示每個 user,反之也可以對 ad id 使用類似的做法。

ad_id   ad_views  ad_clicks  ad_ctr
412533  18339     1355       0.074
423334  335       12         0.036
345664  1244      132        0.106
349833  35387     1244       0.035

ref:
https://blogs.technet.microsoft.com/machinelearning/2015/02/17/big-learning-made-easy-with-counts/

換個思路,如果你有一個 brand 的特徵,然後你可以從 user 的購買記錄中找出購買 A 品牌的人,有 70% 的人會購買 B 品牌、有 40% 的人會購買 C 品牌;購買 D 品牌的人,有 10% 的人會購買 A 品牌和 E 品牌,你可以每個品牌表示成這樣:

brand  A    B    C    D    E
A      1.0  0.7  0.4  0.0  0.0
B      ...
C      ...
D      0.1  0.0  0.0  1.0  0.1
E      ...

ref:
http://phunters.lofter.com/post/86d56_194e956

LabelCount Encoding

類似 Bin-cunting 的做法,一樣是利用現有的 count 或其他統計上的資料,差別在於 LabelCount Encoding 最後用的是次序而不是數值本身。優點是對異常值不敏感。

ad_id   ad_clicks  ad_rank
412533  1355       1
423334  12         4
345664  132        3
349833  1244       2

ref:
https://www.slideshare.net/gabrielspmoreira/feature-engineering-getting-most-out-of-data-for-predictive-models-tdc-2017/47

Count Vectorization

除了可以用在 text 特徵之外,如果你有 comma-seperated 的 categorical 特徵也可以使用這個方法。例如電影類型 genre,裡頭的值長這樣 Action,Sci-Fi,Drama,就可以先用 RegexTokenizer 轉成 Array("action", "sci-fi", "drama"),再用 CountVectorizer 轉成 vector。

ref:
https://spark.apache.org/docs/latest/ml-features.html#countvectorizer

Feature Hashing

以 user id 為例,透過一個 hash function 把每一個 user id 映射到 (hashed1_, hashed_2, ..., hashed_m) 的某個值。指定 m << user id 的取值範圍,所以缺點是會有 collision(如果你的 model 足夠 robust,倒也是可以不管),優點是可以良好地處理之前沒見過的值和罕見的值。當然不只可以 hash 單一值,也可以 hash 一個 vector。

你可以把 feature hashing 表示為單一欄位的數值(例如 2)或是類似 one-hot encoding 那樣的多欄位的 binary 表示法(例如 [0, 0, 1])。

import hashlib

def hash_func(s, n_bins=100000):
    s = s.encode('utf-8')
    return int(hashlib.md5(s).hexdigest(), 16) % (n_bins - 1) + 1

print(hash_func('some categorical value'))

ref:
https://github.com/apache/spark/pull/18513
https://spark.apache.org/docs/latest/ml-features.html#feature-transformation
https://www.slideshare.net/gabrielspmoreira/feature-engineering-getting-most-out-of-data-for-predictive-models-tdc-2017/42

Mean Encoding

ref:
https://zhuanlan.zhihu.com/p/26308272

Category Embedding

ref:
https://arxiv.org/abs/1604.06737
https://www.slideshare.net/HJvanVeen/feature-engineering-72376750/17
https://blog.myyellowroad.com/using-categorical-data-in-machine-learning-with-python-from-dummy-variables-to-deep-category-42fd0a43b009

User Profile 用戶畫像

使用用戶畫像來表示每個 user id,例如用戶的年齡、性別、職業、收入、居住地、偏好的各種 tag 等,把每個 user 表示成一個 feature vector。除了單一維度的特徵之外,也可以建立「用戶聽過的歌都是哪些曲風」、「用戶(30 天內)瀏覽過的文章都是什麼分類,以 TF-IDF 的方式表達。或者是把用戶所有喜歡文章對應的向量的平均值作為此用戶的 profile。比如某個用戶經常關注與推薦系統有關的文章,那麼他的 profile 中 "CB"、"CF" 和 "推薦系統" 對應的權重值就會較高。

ref:
https://mp.weixin.qq.com/s/w87-dyG9Ap9xJ_HZu0Qn-w
https://medium.com/unstructured/how-feature-engineering-can-help-you-do-well-in-a-kaggle-competition-part-i-9cc9a883514d

Rare Categorical Variables

先計算好每一種 category 的數量,然後把小於某個 threshold 的 category 都改成 "Others" 之類的值。或是使用 clustering 演算法來達到同樣的目的。你也可以直接建立一個新的 binary feature 叫做 rare,要來標示那些相對少見的資料點。

Unseen Categorical Variables

以 Spark ML 為例,當你用 training set 的資料 fit 了一個 StringIndexer(和 OneHotEncoder),把它拿去用在 test set 上時,有一定的機率你會遇到某些 categorical 特徵的值只在 test set 出現,所以對只見過 training set 的 transformer 來說,這些就是所謂的 unseen values。

對付 unseen values 通常有幾種做法:

  • 用整個 training set + test set 來編碼 categorical 特徵
  • 直接捨棄含有 unseen values 的那筆資料
  • 把 unseen values 改成 "Others" 之類的已知值。StringIndexer.setHandleInvalid("keep") 基本上就是這種做法

如果採用第一種方式,一但你把這個 transformer 拿到 production 去用時,無可避免地還是會遇到 unseen values。不過通常線上的 feature engineering 會有別的做法,例如事先把 user 或 item 的各項特徵都算好(定期更新或是 data 產生的時候觸發),然後以 id 為 key 存進 Redis 之類的 NoSQL 裡,model 要用的時候直接用 user id / item id 拿到處理好的 feature vector。

ref:
https://stackoverflow.com/questions/34681534/spark-ml-stringindexer-handling-unseen-labels

Large Categorical Variables

針對那種非常大的 categorical 特徵(例如 id 類的特徵),如果你用的是 logistic regression,其實可以硬上 one-hot encoding。不然就是利用上面提到的 feature hashing 或 bin counting 等方式;如果是 GBDT 的話,甚至可以直接用 id 硬上,只要 tree 足夠多。

ref:
https://www.zhihu.com/question/34819617

Feature Construction 特徵建構

特徵構建指的是從原有的特徵中,人工地創造出新的特徵,通常用來解決一般的線性模型沒辦法學到非線性特徵的問題。其中一個重點可能是能不能夠過某些辦法,在特徵中加入某些「額外的資訊」,雖然也得小心數據偏見的問題。

如果你有很多 user 購物的資料,除了可以 aggregate 得到 total spend 這樣的 feature 之外,也可以變換一下,變成 spend in last weekspend in last monthspend in last year 這種可以表示「趨勢」的特徵。

範例:

  • author_avg_page_view: 該文章作者的所有文章的平均瀏覽數
  • user_visited_days_since_doc_published: 該文章發布到該用戶訪問經過了多少天
  • user_history_doc_sim_categories: 用戶讀過的所有文章的分類和該篇文章的分類的 TF-IDF 的相似度
  • user_history_doc_sim_topics: 用戶讀過的所有文章的內文和該篇文章的內文的 TF-IDF 的相似度

ref:
https://medium.com/unstructured/how-feature-engineering-can-help-you-do-well-in-a-kaggle-competition-part-i-9cc9a883514d
https://www.safaribooksonline.com/library/view/large-scale-machine/9781785888748/ch04s02.html
https://www.slideshare.net/HJvanVeen/feature-engineering-72376750/23

Temporal Features 時間特徵

對於 date / time 類型的資料,除了轉換成 timestamp 和取出 day、month 和 year 做成新的欄位之外,也可以對 hour 做 binning(分成上午、中午、晚上之類的)或是對 day 做 binning(分成工作日、週末);或是想辦法查出該日期當天的天氣、節日或活動等訊息,例如 is_national_holidayhas_sport_events

更進一步,用 datetime 類的資料通常也可以做成 spend_hours_last_weekspend_money_last_week 這種可以用來表示「趨勢」的特徵。

Text Features 文字特徵

ref:
https://www.slideshare.net/HJvanVeen/feature-engineering-72376750/57

Spatial Features 地理特徵

如果你有 cityaddress 等特徵,可以新建出 latitudelongitude 兩個 features(當然你得透過外部的 API 或資料來源才做得到),再組合出 median_income_within_2_miles 這樣的特徵。

ref:
https://www.slideshare.net/HJvanVeen/feature-engineering-72376750/47

Cyclical Features

ref:
http://blog.davidkaleko.com/feature-engineering-cyclical-features.html

Features Interaction 特徵交互

假設你有 AB 兩個 continuous 特徵,你可以用 A + BA - BA * BA / B 之類的方式建立新的特徵。例如 house_age_at_purchase = house_built_date - house_purchase_date 或是 click_through_rate = n_clicks / n_impressions

還有一種類似的作法叫 Polynomial Expansion 多項式展開,當 degree 為 2 時,可以把 (x, y) 兩個特徵變成 (x, x * x, y, x * y, y * y) 五個特徵。

ref:
https://spark.apache.org/docs/latest/ml-features.html#polynomialexpansion
https://elitedatascience.com/feature-engineering-best-practices

Feature Combination 特徵組合

也稱為特徵交叉。

特徵組合主要是針對 categorical 特徵,特徵交互則是適用於 continuous 特徵。但是兩者的概念是差不多的,就是把兩個以上的特徵透過某種方式結合在一起,變成新的特徵。通常用來解決一般的線性模型沒辦法學到非線性特徵的問題。

假設有 genderwealth 兩個特徵,分別有 2 和 3 種取值,最簡單的方式就是直接 string concatenation 組合出一個新的特徵 gender_wealth,共有 2 x 3 = 6 種取值。因為是 categorical 特徵,可以直接對 gender_wealth 使用 StringIndexerOneHotEncoder。你當然也可以一起組合 continuous 和 categorical 特徵,例如 age_wealth 這樣的特徵,只是 vector 裡的值就不是 0 1 而是 age 本身了。

假設 C 是 categorical 特徵,N 是 continuous 特徵,以下有幾種有意義的組合:

  • median(N) GROUP BY C 中位數
  • mean(N) GROUP BY C 算術平均數
  • mode(N) GROUP BY C 眾數
  • min(N) GROUP BY C 最小值
  • max(N) GROUP BY C 最大值
  • std(N) GROUP BY C 標準差
  • var(N) GROUP BY C 方差
  • N - median(N) GROUP BY C
user_id  age  gender  wealth  gender_wealth  gender_wealth_ohe   age_wealth
1        56   male    rich    male_rich      [1, 0, 0, 0, 0, 0]  [56, 0, 0]
2        30   male    middle  male_middle    [0, 1, 0, 0, 0, 0]  [0, 30, 0]
3        19   female  rich    female_rich    [0, 0, 0, 1, 0, 0]  [19, 0, 0]
4        62   female  poor    female_poor    [0, 0, 0, 0, 0, 1]  [0, 0, 62]
5        78   male    poor    male_poor      [0, 0, 1, 0, 0, 0]  [0, 0, 78]
6        34   female  middle  female_middle  [0, 0, 0, 0, 1, 0]  [0, 34, 0]

ref:
http://breezedeus.github.io/2014/11/15/breezedeus-feature-processing.html
http://phunters.lofter.com/post/86d56_194e956
https://zhuanlan.zhihu.com/p/26444240
http://blog.csdn.net/mytestmy/article/details/40933235

Feature Extraction 特徵提取

通常就是指 dimensionality reduction。

  • Principal Component Analysis (PCA)
  • Latent Dirichlet Allocation (LDA)
  • Latent Semantic Analysis (LSA)

Feature Selection 特徵選擇

特徵選擇是指透過某些方法自動地從所有的特徵中挑選出有用的特徵。

ref:
http://scikit-learn.org/stable/modules/feature_selection.html

Filter Method

採用某一種評估指標(發散性、相關性或 Information Gain 等),單獨地衡量個別特徵跟 target variable 之間的關係,常用的方法有 Chi Square Test(卡方檢驗)。這種特徵選擇方式沒有任何模型的參與。

以相關性來說,也不見得跟 target variable 的相關性越高就越好,

ref:
https://spark.apache.org/docs/latest/ml-features.html#chisqselector
http://files.cnblogs.com/files/XBWer/%E6%9C%BA%E5%99%A8%E5%AD%A6%E4%B9%A0%E3%81%AE%E7%89%B9%E5%BE%81.pdf

Wrapper Method

會採用某個模型來預測你的 target variable,把特徵選擇想成是一個組合優化的問題,想辦法找出一組特徵子集能夠讓模型的評估結果最好。缺點是太耗時間了,實務上不常用。

ref:
http://www.cnblogs.com/heaad/archive/2011/01/02/1924088.html

Embedded Method

通常會採用一個會為特徵賦予 coefficients 或 importances 的演算法,例如 Logistic Regression(特別是使用 L1 penalty)或 GBDT,直接用權重或重要性對所有特徵排序,然後取前 n 個作為特徵子集。

ref:
http://scikit-learn.org/stable/modules/feature_selection.html#feature-selection-using-selectfrommodel
https://www.zhihu.com/question/28641663

Feature Learning 特徵學習

也稱為 Representation Learning 或 Automated Feature Engineering。

  • GBDT
  • Neural Network: Restricted Boltzmann Machines
  • Deep Learning: Autoencoder

ref:
https://zhuanlan.zhihu.com/p/26444240

Data Leakage 數據洩漏

就是指你在 features 中直接或間接地加入了跟 target variable 有關的數據。

ref:
https://zhuanlan.zhihu.com/p/26444240

Target Engineering

雖然不能算是 feature engineering 的一部分,但是其實你也可以對 target variable / label(就是你的模型要預測的那個值)做點變換。例如 log(y + 1)exp(y) - 1

碼天狗週刊 第 99 期 @vinta - Apache Spark, Python, Machine Learning, Feature Engineering, Testing, Linux

碼天狗週刊 第 99 期 @vinta - Apache Spark, Python, Machine Learning, Feature Engineering, Testing, Linux

本文同步發表於 CodeTengu Weekly - Issue 99

Spark SQL cookbook (Python)

最近在為 StreetVoice 開發一個音樂的推薦系統,採用 Apache Spark,不過因為老是忘記 DataFrame 某某功能的用法,所以就乾脆仿效 O'Reilly 著名的 Cookbook 系列,幫自己寫了一篇 Spark SQL cookbook,複習、速查兩相宜啊。

因為 Spark 支援 Scala、Java、Python 和 R,一開始是打算用 Scala 來練練功的,不過畢竟是公司的專案,考慮到後續其他人的參與和維護,好像還是採用一個團隊成員都熟悉的語言比較好吶(成熟的大人.jpg)。

延伸閱讀:

How to Size Executors, Cores and Memory for a Spark application running in memory

在使用 spark-submit 的時候可以指定 --driver-memory--executor-memory--executor-cores--num-executors 等參數來配置你的 Spark app 可以使用的運算資源,這篇文章指出了幾個需要注意的地方以及 One executor per core 和 One executor per node 這兩種做法會有什麼問題。

P.S. 現在 Spark 除了 Standalone 和 YARN 模式之外,也開始實驗性地支援 Kubernetes 了:apache-spark-on-k8s,看樣子 k8s 真的有一統江湖之勢了啊。

Mastering Feature Engineering

整個推薦系統的 pipeline 可以很粗略地分成 candidate generation 和 ranking 兩個部分,而 ranking 常用的模型之一就是簡單粗暴的 Logistic Regression(通常還會搭配 GBDT 或 Deep Neural Networks)。因為要用 LR 需要大量的 Feature Engineering,所以我就特地找了一本專門在講特徵工程的書,上週末去剪頭髮的時候終於讀完,正好可以推薦給大家。

不過這本書講的是比較基礎的部分(不要想一步登天嘛),例如針對數值特徵的 Binning 或標準化、針對文字特徵的 TF-IDF 和針對類別特徵的 One-hot encoding 或 Feature hashing,對創建出非線性特徵的 Feature Construction 則沒有什麼著墨。可以搭配前幾期推薦過的「机器学习中的数据清洗与特征处理综述」一起看。

Write Explicit Tests

Sometimes, normal programming good practices don’t apply to software tests. DRY in particular I don’t subscribe to for test code, because I want my tests to read like a story. - Kent Beck 如是說

你減少了重複,但是卻帶來了耦合。寫程式真的很難啊。

Strace - The SysAdmin's Microscope

strace 是個可以用來觀測某個 script 或 process 在 system call 這個層面到底做了哪些事的指令,是 troubleshooting 的好幫手,尤其是用來解決在 Linux 上大家喜聞樂見的「幹你娘為什麼 xxx 跑不起來?!(20 分鐘之後)噢我權限設錯了」的問題。

延伸閱讀: