在這個系列的文章裡,我們將使用 Apache Spark、XGBoost、Elasticsearch 和 MySQL 等工具來搭建一個推薦系統的 Machine Learning Pipeline。推薦系統的組成可以粗略地分成 Candidate Generation 和 Ranking 兩個部分,前者是針對用戶產生候選物品集,常用的方法有 Collaborative Filtering、Content-based、標籤配對、熱門排行或人工精選等;後者則是對這些候選物品排序,以 Top N 的方式呈現最終的推薦結果,常用的方法有 Logistic Regression。
在本篇文章中,我們將以 Ranking 階段常用的方法之一:Logistic Regression 邏輯迴歸為例,利用 Apache Spark 的 Logistic Regression 模型建立一個 GitHub repositories 的推薦系統,以用戶對 repo 的打星紀錄和用戶與 repo 的各項屬性做為特徵,預測出用戶會不會打星某個 repo(分類問題)。最後訓練出來的模型就可以做為我們的推薦系統的 Ranking 模組。不過因為 LR 是線性模型,所以通常需要大量的 Feature Engineering 來習得非線性關係。所以這篇文章的重點是 Spark ML 的 Pipeline 機制和特徵工程,不會在演算法的部分著墨太多。
完整的程式碼可以在 https://github.com/vinta/albedo 找到。
系列文章:
- Build a recommender system with Spark: Implicit ALS
- Build a recommender system with Spark: Content-based and Elasticsearch
- Build a recommender system with Spark: Logistic Regression
- Feature Engineering 特徵工程中常見的方法
- Spark ML cookbook (Scala)
- Spark SQL cookbook (Scala)
- 不定期更新中
Submit the Application
$ spark-submit \
--master spark://192.168.10.100:7077 \
--packages "com.github.fommil.netlib:all:1.1.2,com.hankcs:hanlp:portable-1.3.4,mysql:mysql-connector-java:5.1.41" \
--class ws.vinta.albedo.UserProfileBuilder \
target/albedo-1.0.0-SNAPSHOT.jar
$ spark-submit \
--master spark://192.168.10.100:7077 \
--packages "com.github.fommil.netlib:all:1.1.2,com.hankcs:hanlp:portable-1.3.4,mysql:mysql-connector-java:5.1.41" \
--class ws.vinta.albedo.RepoProfileBuilder \
target/albedo-1.0.0-SNAPSHOT.jar
$ spark-submit \
--master spark://192.168.10.100:7077 \
--packages "com.github.fommil.netlib:all:1.1.2,com.hankcs:hanlp:portable-1.3.4,mysql:mysql-connector-java:5.1.41" \
--class ws.vinta.albedo.LogisticRegressionRanker \
target/albedo-1.0.0-SNAPSHOT.jar
ref:
https://vinta.ws/code/setup-spark-scala-and-maven-with-intellij-idea.html
https://spark.apache.org/docs/latest/submitting-applications.html
https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application
Load Data
我們之前已經利用 GitHub API 和 BigQuery 上的 GitHub Archive 收集了 150 萬筆的打星紀錄和所屬的用戶、repo 數據。目前有以下幾個數據集,大致上是照著 GitHub API 建立的,欄位分別如下:
rawUserInfoDS.printSchema()
// root
// |-- user_id: integer (nullable = true)
// |-- user_login: string (nullable = true)
// |-- user_account_type: string (nullable = true)
// |-- user_name: string (nullable = true)
// |-- user_company: string (nullable = true)
// |-- user_blog: string (nullable = true)
// |-- user_location: string (nullable = true)
// |-- user_email: string (nullable = true)
// |-- user_bio: string (nullable = true)
// |-- user_public_repos_count: integer (nullable = true)
// |-- user_public_gists_count: integer (nullable = true)
// |-- user_followers_count: integer (nullable = true)
// |-- user_following_count: integer (nullable = true)
// |-- user_created_at: timestamp (nullable = true)
// |-- user_updated_at: timestamp (nullable = true)
rawRepoInfoDS.printSchema()
// |-- repo_id: integer (nullable = true)
// |-- repo_owner_id: integer (nullable = true)
// |-- repo_owner_username: string (nullable = true)
// |-- repo_owner_type: string (nullable = true)
// |-- repo_name: string (nullable = true)
// |-- repo_full_name: string (nullable = true)
// |-- repo_description: string (nullable = true)
// |-- repo_language: string (nullable = true)
// |-- repo_created_at: timestamp (nullable = true)
// |-- repo_updated_at: timestamp (nullable = true)
// |-- repo_pushed_at: timestamp (nullable = true)
// |-- repo_homepage: string (nullable = true)
// |-- repo_size: integer (nullable = true)
// |-- repo_stargazers_count: integer (nullable = true)
// |-- repo_forks_count: integer (nullable = true)
// |-- repo_subscribers_count: integer (nullable = true)
// |-- repo_is_fork: boolean (nullable = true)
// |-- repo_has_issues: boolean (nullable = true)
// |-- repo_has_projects: boolean (nullable = true)
// |-- repo_has_downloads: boolean (nullable = true)
// |-- repo_has_wiki: boolean (nullable = true)
// |-- repo_has_pages: boolean (nullable = true)
// |-- repo_open_issues_count: integer (nullable = true)
// |-- repo_topics: string (nullable = true)
rawStarringDS.printSchema()
// root
// |-- user_id: integer (nullable = true)
// |-- repo_id: integer (nullable = true)
// |-- starred_at: timestamp (nullable = true)
// |-- starring: double (nullable = true)
ref:
https://www.githubarchive.org/
http://ghtorrent.org/
載入資料之後,要做的第一件事應該就是 Exploratory Data Analysis (EDA) 了,把玩一下手上的數據。建議大家可以試試 Apache Zeppelin 或是 Databricks 的 Notebook,除了內建 Spark 支援的所有語言之外,也整合了 NoSQL 和 JDBC 支援的資料庫,要畫圖表也很方便,簡直比 Jupyter Notebook 還好用了。
ref:
https://zeppelin.apache.org/
https://databricks.com/
Build User Profile / Item Profile
在這個專案中,最主要的數據主體就是 user 和 repo,所以我們針對兩者各自建立 User Profile 和 Item Profile,作為之後在模型訓練階段會使用的特徵。我們把這個步驟跟模型訓練的流程分開,這樣對整個架構的搭建會更有彈性。實務上,我們可以用 user id 或 item id 當 key,直接把製作好的特徵存進 Redis 或其他 schemaless 的 NoSQL 資料庫,方便之後給多個模型取用;在做 real-time 推薦時,也可以很快地拿到特徵,只需要重新計算部份欄位即可。
不過因為這裡主要用的是來自 GitHub API 的資料,某種程度上人家已經幫我們做了很多資料清理和正規化的動作了,但是在現實中,你的系統要處理的數據通常不會這麼乾淨,可能來自各種 data source、有著各種格式,還會隨著時間而改變,通常得花上不少力氣做 Extract, Transform, Load (ETL),所以最好在寫 log(埋點)的時候就溝通好。而且在 production 環境中,數據是會一直變動的,要確保數據的時效性和容錯性,很重要的一個部分就是 monitoring。
ref:
http://www.algorithmdog.com/ad-rec-deploy
https://tech.meituan.com/online-feature-system02.html
礙於篇幅有限,以下的文章中我們只會挑幾個重要的部分說明。簡單說,在這個步驟的最後,我們會得到 userProfileDF
和 repoProfileDF
這兩個 DataFrame,分別存放製作好的特徵。詳細的程式碼如下:
ref:
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/UserProfileBuilder.scala
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/RepoProfileBuilder.scala
Feature Engineering
以推薦系統為例,特徵可以分成以下四種:
- 用戶特徵:用戶本身的各種屬性,例如 user id、性別、職業或所在的城市等
- 物品特徵:物品本身的各種屬性,例如 item id、作者、標題、分類、評分或所屬的標籤等
- 交互特徵:用戶對物品做出的某項行為,該行為的 aggregation 或交叉特徵,例如是否看過同類型的電影、最近聽的歌曲的曲風分佈或上週買了多少高單價的商品
- 上下文特徵:用戶對物品做出的某項行為,該行為的 metadata,例如發生的時間、使用的裝置或當前的 GPS 位置等
有些特徵是在資料採集階段就能拿到,有些特徵則會需要額外的步驟(例如透過外部的 API 或是其他模型)才能取得,也有些特徵必須即時更新。順道一提,因為我們要預測的是「某個用戶會不會打星某個 repo」,所以下述特徵裡的 user 可以是 repo stargazer 也可以是 repo owner。
原始特徵:
- 用戶特徵
user_id
user_login
user_name
user_email
user_blog
user_bio
user_company
user_location
user_followers_coung
user_following_count
user_public_repos_count
user_public_gists_count
user_created_at
user_updated_at
- 物品特徵
repo_id
repo_name
repo_owner
repo_owner_type
repo_language
repo_description
repo_homepage
repo_subscribers_count
repo_stargazers_count
repo_forks_count
repo_size
repo_created_at
repo_updated_at
repo_pushed_at
repo_has_issues
repo_has_projects
repo_has_downloads
repo_has_wiki
repo_has_pages
repo_open_issues_count
repo_topics
- 交互特徵
user_stars_repo
user_follows_user
- 上下文特徵
user_repo_starred_at
發想特徵:
- 用戶特徵
user_days_between_created_at_today
: 該用戶的註冊日期距離今天過了幾年user_days_between_updated_at_today
: 該用戶的更新日期距離今天過了幾天user_repos_avg_stargazers_count
: 該用戶名下的所有 repo(不含 fork 的)的平均星星數user_organizations
: 該用戶屬於哪些組織user_has_null
: 該用戶至少有一個欄位是 nulluser_has_blog
: 該用戶有沒有網站user_is_freelancer
: 該用戶的 bio 中是否包含 Freelancer 等字眼user_is_junior
: 該用戶的 bio 中是否包含 Beginner 或 Junior 等字眼user_is_lead
: 該用戶的 bio 中是否包含 Team Lead、Architect、Creator、CTO 或 VP of Engineering 等字眼user_is_scholar
: 該用戶的 bio 中是否包含 Researcher、Scientist、PhD 或 Professor 等字眼user_is_pm
: 該用戶的 bio 中是否包含 Product Manager 等字眼user_knows_backend
: 該用戶的 bio 中是否包含 Backend 或 Back end 等字眼user_knows_data
: 該用戶的 bio 中是否包含 Machine Learning、Deep Learning 或 Data Science 等字眼user_knows_devops
: 該用戶的 bio 中是否包含 DevOps、SRE、SysAdmin 或 Infrastructure 等字眼user_knows_frontend
: 該用戶的 bio 中是否包含 Frontend 或 Front end 等字眼user_knows_mobile
: 該用戶的 bio 中是否包含 Mobile、iOS 或 Android 等字眼user_knows_recsys
: 該用戶的 bio 中是否包含 Recommender System、Data Mining 或 Information Retrieval 等字眼user_knows_web
: 該用戶的 bio 中是否包含 Web Development 或 Fullstack 等字眼
- 物品特徵
repo_created_at_days_since_today
: 該 repo 的建立日期距離今天過了幾天repo_updated_at_days_since_today
: 該 repo 的更新日期距離今天過了幾天repo_pushed_at_days_since_today
: 該 repo 的提交日期距離今天過了幾天repo_stargazers_count_in_30days
: 該 repo 在 30 天內收到的星星數repo_subscribers_stargazers_ratio
: 該 repo 的 watch 數和 star 數的比例repo_forks_stargazers_ratio
: 該 repo 的 fork 數和 star 數的比例repo_open_issues_stargazers_ratio
: 該 repo 的 數和 star 數的比例repo_releases_count
: 該 repo 的 release 或 tag 數repo_lisence
: 該 repo 的授權條款repo_readme
: 該 repo 的 README 內容repo_has_null
: 該 repo 有至少一個欄位是 nullrepo_has_readme
: 該 repo 是否有 README 檔案repo_has_changelog
: 該 repo 是否有 CHANGELOG 檔案repo_has_contributing
: 該 repo 是否有 CONTRIBUTING 檔案repo_has_tests
: 該 repo 是否有測試repo_has_ci
: 該 repo 是否有 CIrepo_has_dockerfile
: 該 repo 是否有 Dockerfilerepo_is_unmaintained
: 該 repo 是否不再維護了repo_is_awesome
: 該 repo 是否被收錄進任何的 awesome-xxx 列表裡repo_is_vinta_starred
: 該 repo 是否被 @vinta aka 本文的作者打星了
- 交互特徵
user_starred_repos_count
: 該用戶總共打星了多少 repouser_avg_daily_starred_repos_count
: 該用戶平均每天打星多少 repouser_forked_repos_count
: 該用戶總共 fork 了多少 repouser_follower_following_count_ratio
: 該用戶的 follower 數和 following 數的比例user_recent_searched_keywords
: 該用戶最近搜尋的 50 個關鍵字user_recent_commented_repos
: 該用戶最近留言的 50 個 repouser_recent_watched_repos
: 該用戶最近訂閱的 50 個 repouser_recent_starred_repos_descriptions
: 該用戶最近打星的 50 個 repo 的描述user_recent_starred_repos_languages
: 該用戶最近打星的 50 個 repo 的語言user_recent_starred_repos_topics
: 該用戶最近打星的 50 個 repo 的標籤user_follows_repo_owner
: 該用戶是否追蹤該 repo 的作者repo_language_index_in_user_recent_repo_languages
: 該 repo 的語言出現在該用戶最近打星的語言列表的順序repo_language_count_in_user_recent_repo_languages
: 該 repo 的語言出現在該用戶最近打星的語言列表的次數repo_topics_user_recent_topics_similarity
: 該 repo 的標籤與該用戶最近打星的標籤列表的相似度
- 上下文特徵
als_model_prediction
: 來自 ALS 模型的預測值,該用戶對該 repo 的偏好程度gbdt_model_index
: 來自 GBDT 模型的 tree index,該 observation 的自動特徵
Feature Engineering 特徵工程中常見的方法
https://vinta.ws/code/feature-engineering.html
Detect Outliers
除了缺失值之外,離群值(異常值)也是需要注意的地方。如果是 continuous 特徵,用 box plot 可以很快地發現離群值;如果是 categorical 特徵,可以 SELECT COUNT(*) ... GROUP BY
一下,然後用 bar chart 查看每個 category 的數量。取決於你所要解決的問題,異常值可能可以直接忽略,也可能需要特別對待,例如搞清楚異常值出現的原因,是資料採集時的差錯或是某種隱含的深層的因素之類的。
ref:
https://www.analyticsvidhya.com/blog/2016/01/guide-data-exploration/
https://www.slideshare.net/tw_dsconf/123-70852901
Impute Missing Values
可以利用 df.describe().show()
查看各個欄位的統計數據:count
、mean
、stddev
、min
和 max
。除了使用 df.where("some_column IS NULL")
之外,比較不同欄位的 count
差異也可以很快地發現哪些欄位有缺失值。順便觀察一下有缺失值的欄位和 target variable 有沒有什麼關聯。
這裡直接對 null
和 NaN
數據填充缺失值,因為以下幾個欄位都是字串類型,所以直接改成空字串,方便後續的處理。然後順便做一個 has_null
的特徵。
針對 user:
import org.apache.spark.sql.functions._
val nullableColumnNames = Array("user_name", "user_company", "user_blog", "user_location", "user_bio")
val imputedUserInfoDF = rawUserInfoDS
.withColumn("user_has_null", when(nullableColumnNames.map(rawUserInfoDS(_).isNull).reduce(_ || _), true).otherwise(false))
.na.fill("", nullableColumnNames)
針對 repo:
import org.apache.spark.sql.functions._
val nullableColumnNames = Array("repo_description", "repo_homepage")
val imputedRepoInfoDF = rawRepoInfoDS
.withColumn("repo_has_null", when(nullableColumnNames.map(rawRepoInfoDS(_).isNull).reduce(_ || _), true).otherwise(false))
.na.fill("", nullableColumnNames)
ref:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameNaFunctions
如果是數值類型的欄位可以考慮使用 Imputer
。
ref:
https://spark.apache.org/docs/latest/ml-features.html#imputer
Clean Data
針對 user,用 User-defined Function 對幾個文字欄位做一些正規化的處理:
import org.apache.spark.sql.functions._
val cleanUserInfoDF = imputedUserInfoDF
.withColumn("user_clean_company", cleanCompanyUDF($"user_company"))
.withColumn("user_clean_location", cleanLocationUDF($"user_location"))
.withColumn("user_clean_bio", lower($"user_bio"))
針對 repo,過濾掉一些 repo_stargazers_count
太多和太少、description
欄位含有 "unmaintained" 或 "assignment" 等字眼的項目:
val reducedRepoInfo = imputedRepoInfoDF
.where($"repo_is_fork" === false)
.where($"repo_forks_count" <= 90000)
.where($"repo_stargazers_count".between(30, 100000))
val unmaintainedWords = Array("%unmaintained%", "%no longer maintained%", "%deprecated%", "%moved to%")
val assignmentWords = Array("%assignment%", "%作業%", "%作业%")
val demoWords = Array("test", "%demo project%")
val blogWords = Array("my blog")
val cleanRepoInfoDF = reducedRepoInfo
.withColumn("repo_clean_description", lower($"repo_description"))
.withColumn("repo_is_unmaintained", when(unmaintainedWords.map($"repo_clean_description".like(_)).reduce(_ or _), true).otherwise(false))
.withColumn("repo_is_assignment", when(assignmentWords.map($"repo_clean_description".like(_)).reduce(_ or _), true).otherwise(false))
.withColumn("repo_is_demo", when(demoWords.map($"repo_clean_description".like(_)).reduce(_ or _) and $"repo_stargazers_count" <= 40, true).otherwise(false))
.withColumn("repo_is_blog", when(blogWords.map($"repo_clean_description".like(_)).reduce(_ or _) and $"repo_stargazers_count" <= 40, true).otherwise(false))
.where($"repo_is_unmaintained" === false)
.where($"repo_is_assignment" === false)
.where($"repo_is_demo" === false)
.where($"repo_is_blog" === false)
.withColumn("repo_clean_language", lower($"repo_language"))
.withColumn("repo_clean_topics", lower($"repo_topics"))
Construct Features
針對 user,根據上述的「發想特徵」,製作出新的特徵:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val webThings = Array("web", "fullstack", "full stack")
val backendThings = Array("backend", "back end", "back-end")
val frontendThings = Array("frontend", "front end", "front-end")
val mobileThings = Array("mobile", "ios", "android")
val devopsThings = Array("devops", "sre", "admin", "infrastructure")
val dataThings = Array("machine learning", "deep learning", "data scien", "data analy")
val recsysThings = Array("data mining", "recommend", "information retrieval")
val leadTitles = Array("team lead", "architect", "creator", "director", "cto", "vp of engineering")
val scholarTitles = Array("researcher", "scientist", "phd", "professor")
val freelancerTitles = Array("freelance")
val juniorTitles = Array("junior", "beginner", "newbie")
val pmTitles = Array("product manager")
val userStarredReposCountDF = rawStarringDS
.groupBy($"user_id")
.agg(count("*").alias("user_starred_repos_count"))
val starringRepoInfoDF = rawStarringDS
.select($"user_id", $"repo_id", $"starred_at")
.join(rawRepoInfoDS, Seq("repo_id"))
val userTopLanguagesDF = starringRepoInfoDF
.withColumn("rank", rank.over(Window.partitionBy($"user_id").orderBy($"starred_at".desc)))
.where($"rank" <= 50)
.groupBy($"user_id")
.agg(collect_list(lower($"repo_language")).alias("user_recent_repo_languages"))
.select($"user_id", $"user_recent_repo_languages")
val userTopTopicsDF = starringRepoInfoDF
.where($"repo_topics" =!= "")
.withColumn("rank", rank.over(Window.partitionBy($"user_id").orderBy($"starred_at".desc)))
.where($"rank" <= 50)
.groupBy($"user_id")
.agg(concat_ws(",", collect_list(lower($"repo_topics"))).alias("temp_user_recent_repo_topics"))
.select($"user_id", split($"temp_user_recent_repo_topics", ",").alias("user_recent_repo_topics"))
val userTopDescriptionDF = starringRepoInfoDF
.where($"repo_description" =!= "")
.withColumn("rank", rank.over(Window.partitionBy($"user_id").orderBy($"starred_at".desc)))
.where($"rank" <= 50)
.groupBy($"user_id")
.agg(concat_ws(" ", collect_list(lower($"repo_description"))).alias("user_recent_repo_descriptions"))
.select($"user_id", $"user_recent_repo_descriptions")
val constructedUserInfoDF = cleanUserInfoDF
.withColumn("user_knows_web", when(webThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
.withColumn("user_knows_backend", when(backendThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
.withColumn("user_knows_frontend", when(frontendThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
.withColumn("user_knows_mobile", when(mobileThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
.withColumn("user_knows_devops", when(devopsThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
.withColumn("user_knows_data", when(dataThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
.withColumn("user_knows_recsys", when(recsysThings.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
.withColumn("user_is_lead", when(leadTitles.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
.withColumn("user_is_scholar", when(scholarTitles.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
.withColumn("user_is_freelancer", when(freelancerTitles.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
.withColumn("user_is_junior", when(juniorTitles.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
.withColumn("user_is_pm", when(pmTitles.map($"user_clean_bio".like(_)).reduce(_ or _), true).otherwise(false))
.withColumn("user_followers_following_ratio", round($"user_followers_count" / ($"user_following_count" + lit(1.0)), 3))
.withColumn("user_days_between_created_at_today", datediff(current_date(), $"user_created_at"))
.withColumn("user_days_between_updated_at_today", datediff(current_date(), $"user_updated_at"))
.join(userStarredReposCountDF, Seq("user_id"))
.withColumn("user_avg_daily_starred_repos_count", round($"user_starred_repos_count" / ($"user_days_between_created_at_today" + lit(1.0)), 3))
.join(userTopDescriptionDF, Seq("user_id"))
.join(userTopTopicsDF, Seq("user_id"))
.join(userTopLanguagesDF, Seq("user_id"))
針對 repo,根據上述的「發想特徵」,製作出新的特徵,意思到了就好:
import org.apache.spark.sql.functions._
val vintaStarredRepos = rawStarringDS
.where($"user_id" === 652070)
.select($"repo_id".as[Int])
.collect()
.to[List]
val constructedRepoInfoDF = cleanRepoInfoDF
.withColumn("repo_has_activities_in_60days", datediff(current_date(), $"repo_pushed_at") <= 60)
.withColumn("repo_has_homepage", when($"repo_homepage" === "", false).otherwise(true))
.withColumn("repo_is_vinta_starred", when($"repo_id".isin(vintaStarredRepos: _*), true).otherwise(false))
.withColumn("repo_days_between_created_at_today", datediff(current_date(), $"repo_created_at"))
.withColumn("repo_days_between_updated_at_today", datediff(current_date(), $"repo_updated_at"))
.withColumn("repo_days_between_pushed_at_today", datediff(current_date(), $"repo_pushed_at"))
.withColumn("repo_subscribers_stargazers_ratio", round($"repo_subscribers_count" / ($"repo_stargazers_count" + lit(1.0)), 3))
.withColumn("repo_forks_stargazers_ratio", round($"repo_forks_count" / ($"repo_stargazers_count" + lit(1.0)), 3))
.withColumn("repo_open_issues_stargazers_ratio", round($"repo_open_issues_count" / ($"repo_stargazers_count" + lit(1.0)), 3))
.withColumn("repo_text", lower(concat_ws(" ", $"repo_owner_username", $"repo_name", $"repo_language", $"repo_description")))
ref:
https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html
Convert Features
針對 user,這裡主要是對一些 categorical 特徵作 binning:
import org.apache.spark.sql.functions._
val companyCountDF = cleanUserInfoDF
.groupBy($"user_clean_company")
.agg(count("*").alias("count_per_user_company"))
val locationCountDF = cleanUserInfoDF
.groupBy($"user_clean_location")
.agg(count("*").alias("count_per_user_location"))
val transformedUserInfoDF = constructedUserInfoDF
.join(companyCountDF, Seq("user_clean_company"))
.join(locationCountDF, Seq("user_clean_location"))
.withColumn("user_has_blog", when($"user_blog" === "", 0.0).otherwise(1.0))
.withColumn("user_binned_company", when($"count_per_user_company" <= 5, "__other").otherwise($"user_clean_company"))
.withColumn("user_binned_location", when($"count_per_user_location" <= 50, "__other").otherwise($"user_clean_location"))
針對 repo:
import org.apache.spark.sql.functions._
val languagesDF = cleanRepoInfoDF
.groupBy($"repo_clean_language")
.agg(count("*").alias("count_per_repo_language"))
.select($"repo_clean_language", $"count_per_repo_language")
.cache()
val transformedRepoInfoDF = constructedRepoInfoDF
.join(languagesDF, Seq("repo_clean_language"))
.withColumn("repo_binned_language", when($"count_per_repo_language" <= 30, "__other").otherwise($"repo_clean_language"))
.withColumn("repo_clean_topics", split($"repo_topics", ","))
ref:
https://docs.databricks.com/spark/latest/mllib/binary-classification-mllib-pipelines.html
Prepare the Feature Pipeline
我們過濾掉那些打星了超多 repo 的用戶。從收集到的數據發現,有些用戶甚至打星了一兩萬個 repo,這些用戶可能是個爬蟲專用帳號或是他看到什麼就打星什麼,推薦系統對這樣的用戶來說可能沒什麼意義,還不如從數據集中拿掉。
import org.apache.spark.sql.functions._
val maxStarredReposCount = 2000
val userStarredReposCountDF = rawStarringDS
.groupBy($"user_id")
.agg(count("*").alias("user_starred_repos_count"))
val reducedStarringDF = rawStarringDS
.join(userStarredReposCountDF, Seq("user_id"))
.where($"user_starred_repos_count" <= maxStarredReposCount)
.select($"user_id", $"repo_id", $"starred_at", $"starring")
val profileStarringDF = reducedStarringDF
.join(userProfileDF, Seq("user_id"))
.join(repoProfileDF, Seq("repo_id"))
Build the Feature Pipeline
把處理特徵的一連串流程寫成 Spark ML Pipeline,方便抽換或是加入新的 Transformer
,例如 Standardization、One-hot Encoding 和 Word2Vec,也把 ALS 模型的預測值做為其中一項特徵。
import org.apache.spark.ml.feature._
import org.apache.spark.ml.recommendation.ALSModel
import ws.vinta.albedo.transformers.UserRepoTransformer
val profileStarringDF = reducedStarringDF
.join(userProfileDF, Seq("user_id"))
.join(repoProfileDF, Seq("repo_id"))
.cache()
categoricalColumnNames += "user_id"
categoricalColumnNames += "repo_id"
val userRepoTransformer = new UserRepoTransformer()
.setInputCols(Array("repo_language", "user_recent_repo_languages"))
continuousColumnNames += "repo_language_index_in_user_recent_repo_languages"
continuousColumnNames += "repo_language_count_in_user_recent_repo_languages"
val alsModelPath = s"${settings.dataDir}/${settings.today}/alsModel.parquet"
val alsModel = ALSModel.load(alsModelPath)
.setUserCol("user_id")
.setItemCol("repo_id")
.setPredictionCol("als_score")
.setColdStartStrategy("drop")
continuousColumnNames += "als_score"
val categoricalTransformers = categoricalColumnNames.flatMap((columnName: String) => {
val stringIndexer = new StringIndexer()
.setInputCol(columnName)
.setOutputCol(s"${columnName}__idx")
.setHandleInvalid("keep")
val oneHotEncoder = new OneHotEncoder()
.setInputCol(s"${columnName}__idx")
.setOutputCol(s"${columnName}__ohe")
.setDropLast(false)
Array(stringIndexer, oneHotEncoder)
})
val listTransformers = listColumnNames.flatMap((columnName: String) => {
val countVectorizerModel = new CountVectorizer()
.setInputCol(columnName)
.setOutputCol(s"${columnName}__cv")
.setMinDF(10)
.setMinTF(1)
Array(countVectorizerModel)
})
val textTransformers = textColumnNames.flatMap((columnName: String) => {
val hanLPTokenizer = new HanLPTokenizer()
.setInputCol(columnName)
.setOutputCol(s"${columnName}__words")
.setShouldRemoveStopWords(true)
val stopWordsRemover = new StopWordsRemover()
.setInputCol(s"${columnName}__words")
.setOutputCol(s"${columnName}__filtered_words")
.setStopWords(StopWordsRemover.loadDefaultStopWords("english"))
val word2VecModelPath = s"${settings.dataDir}/${settings.today}/word2VecModel.parquet"
val word2VecModel = Word2VecModel.load(word2VecModelPath)
.setInputCol(s"${columnName}__filtered_words")
.setOutputCol(s"${columnName}__w2v")
Array(hanLPTokenizer, stopWordsRemover, word2VecModel)
})
val finalBooleanColumnNames = booleanColumnNames.toArray
val finalContinuousColumnNames = continuousColumnNames.toArray
val finalCategoricalColumnNames = categoricalColumnNames.map(columnName => s"${columnName}__ohe").toArray
val finalListColumnNames = listColumnNames.map(columnName => s"${columnName}__cv").toArray
val finalTextColumnNames = textColumnNames.map(columnName => s"${columnName}__w2v").toArray
val vectorAssembler = new SimpleVectorAssembler()
.setInputCols(finalBooleanColumnNames ++ finalContinuousColumnNames ++ finalCategoricalColumnNames ++ finalListColumnNames ++ finalTextColumnNames)
.setOutputCol("features")
val featureStages = mutable.ArrayBuffer.empty[PipelineStage]
featureStages += userRepoTransformer
featureStages += alsModel
featureStages ++= categoricalTransformers
featureStages ++= listTransformers
featureStages ++= textTransformers
featureStages += vectorAssembler
val featurePipeline = new Pipeline().setStages(featureStages.toArray)
val featurePipelineModel = featurePipeline.fit(profileStarringDF)
ref:
https://spark.apache.org/docs/latest/ml-pipeline.html
https://spark.apache.org/docs/latest/ml-features.html
Handle Imbalanced Data
因為我們要訓練一個 Binary Classification 二元分類模型,會同時需要 positive(正樣本)和 negative(負樣本)。但是我們的原始數據 rawStarringDS
都是正樣本,也就是說我們只有「用戶有對哪些 repo 打星的資料」(正樣本),卻沒有「用戶沒有對哪些 repo 打星的資料」(負樣本)。我們當然是可以用「所有用戶沒有打星的 repo 做為負樣本」,但是考慮到這種做法產生的負樣本的數量實在太大,而且也不太合理,因為那些用戶沒有打星的 repo 不見得是因為他不喜歡,可能只是因為他不知道有那個 repo 存在。
我們後來採用的做法是「用熱門但是用戶沒有打星的 repo 做為負樣本」,我們寫了一個 Spark Transformer 來做這件事:
import ws.vinta.albedo.transformers.NegativeBalancer
import scala.collection.mutable
val sc = spark.sparkContext
val popularReposDS = loadPopularRepoDF()
val popularRepos = popularReposDS
.select($"repo_id".as[Int])
.collect()
.to[mutable.LinkedHashSet]
val bcPopularRepos = sc.broadcast(popularRepos)
val negativeBalancer = new NegativeBalancer(bcPopularRepos)
.setUserCol("user_id")
.setItemCol("repo_id")
.setTimeCol("starred_at")
.setLabelCol("starring")
.setNegativeValue(0.0)
.setNegativePositiveRatio(2.0)
val balancedStarringDF = negativeBalancer.transform(reducedStarringDF)
ref:
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/evaluators/RankingEvaluator.scala
http://www.kdnuggets.com/2017/06/7-techniques-handle-imbalanced-data.html
Split Data
直接使用 holdout 的方式,隨機分配不同的 row 到 training set 和 test set。其他的做法可能是根據時間來拆分,用以前的數據來預測之後的行為。
val profileBalancedStarringDF = balancedStarringDF
.join(userProfileDF, Seq("user_id"))
.join(repoProfileDF, Seq("repo_id"))
val tmpDF = featurePipelineModel.transform(profileBalancedStarringDF)
val keepColumnName = tmpDF.columns.filter((columnName: String) => {
!columnName.endsWith("__idx") &&
!columnName.endsWith("__ohe") &&
!columnName.endsWith("__cv") &&
!columnName.endsWith("__words") &&
!columnName.endsWith("__filtered_words") &&
!columnName.endsWith("__w2v")
})
val featuredBalancedStarringDF = tmpDF.select(keepColumnName.map(col): _*)
val Array(trainingFeaturedDF, testFeaturedDF) = featuredBalancedStarringDF.randomSplit(Array(0.9, 0.1))
Build the Model Pipeline
為了方便之後的擴充性,這裡也使用 Spark ML Pipeline 的寫法。Spark ML 的 LogisticRegression
可以額外設置一個 weightCol
來調整不同 row 的權重。
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline, PipelineStage}
import scala.collection.mutable
val weightSQL = """
SELECT *,
1.0 AS default_weight,
IF (starring = 1.0, 0.9, 0.1) AS positive_weight,
IF (starring = 1.0 AND datediff(current_date(), starred_at) <= 365, 0.9, 0.1) AS recent_starred_weight
FROM __THIS__
""".stripMargin
val weightTransformer = new SQLTransformer()
.setStatement(weightSQL)
val lr = new LogisticRegression()
.setMaxIter(200)
.setRegParam(0.7)
.setElasticNetParam(0.0)
.setStandardization(true)
.setLabelCol("starring")
.setFeaturesCol("standard_features")
.setWeightCol("recent_starred_weight")
val modelStages = mutable.ArrayBuffer.empty[PipelineStage]
modelStages += weightTransformer
modelStages += lr
val modelPipeline = new Pipeline().setStages(modelStages.toArray)
val modelPipelineModel = modelPipeline.fit(trainingFeaturedDF)
ref:
https://spark.apache.org/docs/latest/ml-classification-regression.html
Evaluate the Model: Classification
因為 Logistic Regression 是二元分類模型,所以我們可以用 Spark ML 的 BinaryClassificationEvaluator
來評估結果。不過因為我們做的是推薦系統,真正在乎的是 Top N 的排序問題,所以這裡的 AUC 的數值參考一下就好。
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
val testRankedDF = modelPipelineModel.transform(testFeaturedDF)
val binaryClassificationEvaluator = new BinaryClassificationEvaluator()
.setMetricName("areaUnderROC")
.setRawPredictionCol("rawPrediction")
.setLabelCol("starring")
val classificationMetric = binaryClassificationEvaluator.evaluate(testRankedDF)
println(s"${binaryClassificationEvaluator.getMetricName} = $classificationMetric")
// areaUnderROC = 0.9450631491281277
ref:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
https://docs.databricks.com/spark/latest/mllib/binary-classification-mllib-pipelines.html
Generate Candidates
推薦系統的另外一個重要部分就是產生候選物品集,這裡我們使用以下幾種方式:
- ALS: 協同過濾的推薦
- Content-based: 基於內容的推薦
- Popularity: 基於熱門的推薦
不過因為這篇文章的主題是排序和特徵工程的 Machine Learning Pipeline,所以產生候選物品集的部分就不多說了,有興趣的人可以直接看底下連結的 source code 或是這個系列的其他文章。
import ws.vinta.albedo.recommenders.ALSRecommender
import ws.vinta.albedo.recommenders.ContentRecommender
import ws.vinta.albedo.recommenders.PopularityRecommender
val topK = 30
val alsRecommender = new ALSRecommender()
.setUserCol("user_id")
.setItemCol("repo_id")
.setTopK(topK)
val contentRecommender = new ContentRecommender()
.setUserCol("user_id")
.setItemCol("repo_id")
.setTopK(topK)
.setEnableEvaluationMode(true)
val popularityRecommender = new PopularityRecommender()
.setUserCol("user_id")
.setItemCol("repo_id")
.setTopK(topK)
val recommenders = mutable.ArrayBuffer.empty[Recommender]
recommenders += alsRecommender
recommenders += contentRecommender
recommenders += popularityRecommender
val candidateDF = recommenders
.map((recommender: Recommender) => recommender.recommendForUsers(testUserDF))
.reduce(_ union _)
.select($"user_id", $"repo_id")
.distinct()
// 每個 Recommender 的結果類似這樣:
// +-------+-------+----------+------+
// |user_id|repo_id|score |source|
// +-------+-------+----------+------+
// |652070 |1239728|0.6731846 |als |
// |652070 |854078 |0.7187486 |als |
// |652070 |1502338|0.70165294|als |
// |652070 |1184678|0.7434903 |als |
// |652070 |547708 |0.7956538 |als |
// +-------+-------+----------+------+
ref:
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/recommenders/ALSRecommender.scala
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/recommenders/ContentRecommender.scala
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/recommenders/PopularityRecommender.scala
Predict the Ranking
把這些候選物品集丟給我們訓練好的 Logistic Regression 模型來排序。結果中的 probability
欄位的第 0 項表示結果為 0 的機率(negative)、第 1 項表示結果為 1 的機率(positive)。
val profileCandidateDF = candidateDF
.join(userProfileDF, Seq("user_id"))
.join(repoProfileDF, Seq("repo_id"))
val featuredCandidateDF = featurePipelineModel
.transform(profileCandidateDF)
val rankedCandidateDF = modelPipelineModel
.transform(featuredCandidateDF)
// rankedCandidateDF 的結果類似這樣:
// +-------+--------+----------+----------------------------------------+
// |user_id|repo_id |prediction|probability |
// +-------+--------+----------+----------------------------------------+
// |652070 |83467664|1.0 |[0.12711894229094317,0.8728810577090568]|
// |652070 |55099616|1.0 |[0.1422859437320775,0.8577140562679224] |
// |652070 |42266235|1.0 |[0.1462014853157966,0.8537985146842034] |
// |652070 |78012800|1.0 |[0.15576081067098502,0.844239189329015] |
// |652070 |5928761 |1.0 |[0.16149848941925066,0.8385015105807493]|
// +-------+--------+----------+----------------------------------------+
Evaluate the Model: Ranking
最後我們使用 Information Retrieval 領域中用來評價排序能力的指標 NDCG (Normalized Discounted Cumulative Gain) 來評估排序的結果。Spark MLlib 有現成的 RankingMetrics
可以用,但是它只適用於 RDD-based 的 API,所以我們改寫成適合 DataFrame-based 的 Evaluator
。
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import ws.vinta.albedo.evaluators.RankingEvaluator
val userActualItemsDF = reducedStarringDF
.withColumn("rank", rank().over(Window.partitionBy($"user_id").orderBy($"starred_at".desc)))
.where($"rank" <= topK)
.groupBy($"user_id")
.agg(collect_list($"repo_id").alias("items"))
val userPredictedItemsDF = rankedCandidateDF
.withColumn("rank", rank().over(Window.partitionBy($"user_id").orderBy(toArrayUDF($"probability").getItem(1).desc)))
.where($"rank" <= topK)
.groupBy($"user_id")
.agg(collect_list($"repo_id").alias("items"))
val rankingEvaluator = new RankingEvaluator(userActualItemsDF)
.setMetricName("NDCG@k")
.setK(topK)
.setUserCol("user_id")
.setItemsCol("items")
val rankingMetric = rankingEvaluator.evaluate(userPredictedItemsDF)
println(s"${rankingEvaluator.getFormattedMetricName} = $rankingMetric")
// NDCG@30 = 0.021114356461615493
ref:
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/evaluators/RankingEvaluator.scala
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems
https://weekly.codetengu.com/issues/83#kOxuVxW