在這個系列的文章裡,我們將使用 Apache Spark、XGBoost、Elasticsearch 和 MySQL 等工具來搭建一個推薦系統的 Machine Learning Pipeline。推薦系統的組成可以粗略地分成 Candidate Generation 和 Ranking 兩個部分,前者是針對用戶產生候選物品集,常用的方法有 Collaborative Filtering、Content-based、標籤配對、熱門排行或人工精選等;後者則是對這些候選物品排序,以 Top N 的方式呈現最終的推薦結果,常用的方法有 Logistic Regression。
在本篇文章中,我們將以 Ranking 階段常用的方法之一:Logistic Regression 邏輯迴歸為例,利用 Apache Spark 的 Logistic Regression 模型建立一個 GitHub repositories 的推薦系統,以用戶對 repo 的打星紀錄和用戶與 repo 的各項屬性做為特徵,預測出用戶會不會打星某個 repo(分類問題)。最後訓練出來的模型就可以做為我們的推薦系統的 Ranking 模組。不過因為 LR 是線性模型,所以通常需要大量的 Feature Engineering 來習得非線性關係。所以這篇文章的重點是 Spark ML 的 Pipeline 機制和特徵工程,不會在演算法的部分著墨太多。
完整的程式碼可以在 https://github.com/vinta/albedo 找到。
系列文章:
- Build a recommender system with Spark: Implicit ALS
- Build a recommender system with Spark: Content-based and Elasticsearch
- Build a recommender system with Spark: Logistic Regression
- Feature Engineering 特徵工程中常見的方法
- Spark ML cookbook (Scala)
- Spark SQL cookbook (Scala)
- 不定期更新中
Submit the Application
ref:
https://vinta.ws/code/setup-spark-scala-and-maven-with-intellij-idea.html
https://spark.apache.org/docs/latest/submitting-applications.html
https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application
Load Data
我們之前已經利用 GitHub API 和 BigQuery 上的 GitHub Archive 收集了 150 萬筆的打星紀錄和所屬的用戶、repo 數據。目前有以下幾個數據集,大致上是照著 GitHub API 建立的,欄位分別如下:
ref:
https://www.githubarchive.org/
http://ghtorrent.org/
載入資料之後,要做的第一件事應該就是 Exploratory Data Analysis (EDA) 了,把玩一下手上的數據。建議大家可以試試 Apache Zeppelin 或是 Databricks 的 Notebook,除了內建 Spark 支援的所有語言之外,也整合了 NoSQL 和 JDBC 支援的資料庫,要畫圖表也很方便,簡直比 Jupyter Notebook 還好用了。
ref:
https://zeppelin.apache.org/
https://databricks.com/
Build User Profile / Item Profile
在這個專案中,最主要的數據主體就是 user 和 repo,所以我們針對兩者各自建立 User Profile 和 Item Profile,作為之後在模型訓練階段會使用的特徵。我們把這個步驟跟模型訓練的流程分開,這樣對整個架構的搭建會更有彈性。實務上,我們可以用 user id 或 item id 當 key,直接把製作好的特徵存進 Redis 或其他 schemaless 的 NoSQL 資料庫,方便之後給多個模型取用;在做 real-time 推薦時,也可以很快地拿到特徵,只需要重新計算部份欄位即可。
不過因為這裡主要用的是來自 GitHub API 的資料,某種程度上人家已經幫我們做了很多資料清理和正規化的動作了,但是在現實中,你的系統要處理的數據通常不會這麼乾淨,可能來自各種 data source、有著各種格式,還會隨著時間而改變,通常得花上不少力氣做 Extract, Transform, Load (ETL),所以最好在寫 log(埋點)的時候就溝通好。而且在 production 環境中,數據是會一直變動的,要確保數據的時效性和容錯性,很重要的一個部分就是 monitoring。
ref:
http://www.algorithmdog.com/ad-rec-deploy
https://tech.meituan.com/online-feature-system02.html
礙於篇幅有限,以下的文章中我們只會挑幾個重要的部分說明。簡單說,在這個步驟的最後,我們會得到 userProfileDF
和 repoProfileDF
這兩個 DataFrame,分別存放製作好的特徵。詳細的程式碼如下:
ref:
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/UserProfileBuilder.scala
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/RepoProfileBuilder.scala
Feature Engineering
以推薦系統為例,特徵可以分成以下四種:
- 用戶特徵:用戶本身的各種屬性,例如 user id、性別、職業或所在的城市等
- 物品特徵:物品本身的各種屬性,例如 item id、作者、標題、分類、評分或所屬的標籤等
- 交互特徵:用戶對物品做出的某項行為,該行為的 aggregation 或交叉特徵,例如是否看過同類型的電影、最近聽的歌曲的曲風分佈或上週買了多少高單價的商品
- 上下文特徵:用戶對物品做出的某項行為,該行為的 metadata,例如發生的時間、使用的裝置或當前的 GPS 位置等
有些特徵是在資料採集階段就能拿到,有些特徵則會需要額外的步驟(例如透過外部的 API 或是其他模型)才能取得,也有些特徵必須即時更新。順道一提,因為我們要預測的是「某個用戶會不會打星某個 repo」,所以下述特徵裡的 user 可以是 repo stargazer 也可以是 repo owner。
原始特徵:
- 用戶特徵
user_id
user_login
user_name
user_email
user_blog
user_bio
user_company
user_location
user_followers_coung
user_following_count
user_public_repos_count
user_public_gists_count
user_created_at
user_updated_at
- 物品特徵
repo_id
repo_name
repo_owner
repo_owner_type
repo_language
repo_description
repo_homepage
repo_subscribers_count
repo_stargazers_count
repo_forks_count
repo_size
repo_created_at
repo_updated_at
repo_pushed_at
repo_has_issues
repo_has_projects
repo_has_downloads
repo_has_wiki
repo_has_pages
repo_open_issues_count
repo_topics
- 交互特徵
user_stars_repo
user_follows_user
- 上下文特徵
user_repo_starred_at
發想特徵:
- 用戶特徵
user_days_between_created_at_today
: 該用戶的註冊日期距離今天過了幾年user_days_between_updated_at_today
: 該用戶的更新日期距離今天過了幾天user_repos_avg_stargazers_count
: 該用戶名下的所有 repo(不含 fork 的)的平均星星數user_organizations
: 該用戶屬於哪些組織user_has_null
: 該用戶至少有一個欄位是 nulluser_has_blog
: 該用戶有沒有網站user_is_freelancer
: 該用戶的 bio 中是否包含 Freelancer 等字眼user_is_junior
: 該用戶的 bio 中是否包含 Beginner 或 Junior 等字眼user_is_lead
: 該用戶的 bio 中是否包含 Team Lead、Architect、Creator、CTO 或 VP of Engineering 等字眼user_is_scholar
: 該用戶的 bio 中是否包含 Researcher、Scientist、PhD 或 Professor 等字眼user_is_pm
: 該用戶的 bio 中是否包含 Product Manager 等字眼user_knows_backend
: 該用戶的 bio 中是否包含 Backend 或 Back end 等字眼user_knows_data
: 該用戶的 bio 中是否包含 Machine Learning、Deep Learning 或 Data Science 等字眼user_knows_devops
: 該用戶的 bio 中是否包含 DevOps、SRE、SysAdmin 或 Infrastructure 等字眼user_knows_frontend
: 該用戶的 bio 中是否包含 Frontend 或 Front end 等字眼user_knows_mobile
: 該用戶的 bio 中是否包含 Mobile、iOS 或 Android 等字眼user_knows_recsys
: 該用戶的 bio 中是否包含 Recommender System、Data Mining 或 Information Retrieval 等字眼user_knows_web
: 該用戶的 bio 中是否包含 Web Development 或 Fullstack 等字眼
- 物品特徵
repo_created_at_days_since_today
: 該 repo 的建立日期距離今天過了幾天repo_updated_at_days_since_today
: 該 repo 的更新日期距離今天過了幾天repo_pushed_at_days_since_today
: 該 repo 的提交日期距離今天過了幾天repo_stargazers_count_in_30days
: 該 repo 在 30 天內收到的星星數repo_subscribers_stargazers_ratio
: 該 repo 的 watch 數和 star 數的比例repo_forks_stargazers_ratio
: 該 repo 的 fork 數和 star 數的比例repo_open_issues_stargazers_ratio
: 該 repo 的 數和 star 數的比例repo_releases_count
: 該 repo 的 release 或 tag 數repo_lisence
: 該 repo 的授權條款repo_readme
: 該 repo 的 README 內容repo_has_null
: 該 repo 有至少一個欄位是 nullrepo_has_readme
: 該 repo 是否有 README 檔案repo_has_changelog
: 該 repo 是否有 CHANGELOG 檔案repo_has_contributing
: 該 repo 是否有 CONTRIBUTING 檔案repo_has_tests
: 該 repo 是否有測試repo_has_ci
: 該 repo 是否有 CIrepo_has_dockerfile
: 該 repo 是否有 Dockerfilerepo_is_unmaintained
: 該 repo 是否不再維護了repo_is_awesome
: 該 repo 是否被收錄進任何的 awesome-xxx 列表裡repo_is_vinta_starred
: 該 repo 是否被 @vinta aka 本文的作者打星了
- 交互特徵
user_starred_repos_count
: 該用戶總共打星了多少 repouser_avg_daily_starred_repos_count
: 該用戶平均每天打星多少 repouser_forked_repos_count
: 該用戶總共 fork 了多少 repouser_follower_following_count_ratio
: 該用戶的 follower 數和 following 數的比例user_recent_searched_keywords
: 該用戶最近搜尋的 50 個關鍵字user_recent_commented_repos
: 該用戶最近留言的 50 個 repouser_recent_watched_repos
: 該用戶最近訂閱的 50 個 repouser_recent_starred_repos_descriptions
: 該用戶最近打星的 50 個 repo 的描述user_recent_starred_repos_languages
: 該用戶最近打星的 50 個 repo 的語言user_recent_starred_repos_topics
: 該用戶最近打星的 50 個 repo 的標籤user_follows_repo_owner
: 該用戶是否追蹤該 repo 的作者repo_language_index_in_user_recent_repo_languages
: 該 repo 的語言出現在該用戶最近打星的語言列表的順序repo_language_count_in_user_recent_repo_languages
: 該 repo 的語言出現在該用戶最近打星的語言列表的次數repo_topics_user_recent_topics_similarity
: 該 repo 的標籤與該用戶最近打星的標籤列表的相似度
- 上下文特徵
als_model_prediction
: 來自 ALS 模型的預測值,該用戶對該 repo 的偏好程度gbdt_model_index
: 來自 GBDT 模型的 tree index,該 observation 的自動特徵
Feature Engineering 特徵工程中常見的方法
https://vinta.ws/code/feature-engineering.html
Detect Outliers
除了缺失值之外,離群值(異常值)也是需要注意的地方。如果是 continuous 特徵,用 box plot 可以很快地發現離群值;如果是 categorical 特徵,可以 SELECT COUNT(*) ... GROUP BY
一下,然後用 bar chart 查看每個 category 的數量。取決於你所要解決的問題,異常值可能可以直接忽略,也可能需要特別對待,例如搞清楚異常值出現的原因,是資料採集時的差錯或是某種隱含的深層的因素之類的。
ref:
https://www.analyticsvidhya.com/blog/2016/01/guide-data-exploration/
https://www.slideshare.net/tw_dsconf/123-70852901
Impute Missing Values
可以利用 df.describe().show()
查看各個欄位的統計數據:count
、mean
、stddev
、min
和 max
。除了使用 df.where("some_column IS NULL")
之外,比較不同欄位的 count
差異也可以很快地發現哪些欄位有缺失值。順便觀察一下有缺失值的欄位和 target variable 有沒有什麼關聯。
這裡直接對 null
和 NaN
數據填充缺失值,因為以下幾個欄位都是字串類型,所以直接改成空字串,方便後續的處理。然後順便做一個 has_null
的特徵。
針對 user:
針對 repo:
ref:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameNaFunctions
如果是數值類型的欄位可以考慮使用 Imputer
。
ref:
https://spark.apache.org/docs/latest/ml-features.html#imputer
Clean Data
針對 user,用 User-defined Function 對幾個文字欄位做一些正規化的處理:
針對 repo,過濾掉一些 repo_stargazers_count
太多和太少、description
欄位含有 "unmaintained" 或 "assignment" 等字眼的項目:
Construct Features
針對 user,根據上述的「發想特徵」,製作出新的特徵:
針對 repo,根據上述的「發想特徵」,製作出新的特徵,意思到了就好:
ref:
https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html
Convert Features
針對 user,這裡主要是對一些 categorical 特徵作 binning:
針對 repo:
ref:
https://docs.databricks.com/spark/latest/mllib/binary-classification-mllib-pipelines.html
Prepare the Feature Pipeline
我們過濾掉那些打星了超多 repo 的用戶。從收集到的數據發現,有些用戶甚至打星了一兩萬個 repo,這些用戶可能是個爬蟲專用帳號或是他看到什麼就打星什麼,推薦系統對這樣的用戶來說可能沒什麼意義,還不如從數據集中拿掉。
Build the Feature Pipeline
把處理特徵的一連串流程寫成 Spark ML Pipeline,方便抽換或是加入新的 Transformer
,例如 Standardization、One-hot Encoding 和 Word2Vec,也把 ALS 模型的預測值做為其中一項特徵。
ref:
https://spark.apache.org/docs/latest/ml-pipeline.html
https://spark.apache.org/docs/latest/ml-features.html
Handle Imbalanced Data
因為我們要訓練一個 Binary Classification 二元分類模型,會同時需要 positive(正樣本)和 negative(負樣本)。但是我們的原始數據 rawStarringDS
都是正樣本,也就是說我們只有「用戶有對哪些 repo 打星的資料」(正樣本),卻沒有「用戶沒有對哪些 repo 打星的資料」(負樣本)。我們當然是可以用「所有用戶沒有打星的 repo 做為負樣本」,但是考慮到這種做法產生的負樣本的數量實在太大,而且也不太合理,因為那些用戶沒有打星的 repo 不見得是因為他不喜歡,可能只是因為他不知道有那個 repo 存在。
我們後來採用的做法是「用熱門但是用戶沒有打星的 repo 做為負樣本」,我們寫了一個 Spark Transformer 來做這件事:
ref:
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/evaluators/RankingEvaluator.scala
http://www.kdnuggets.com/2017/06/7-techniques-handle-imbalanced-data.html
Split Data
直接使用 holdout 的方式,隨機分配不同的 row 到 training set 和 test set。其他的做法可能是根據時間來拆分,用以前的數據來預測之後的行為。
Build the Model Pipeline
為了方便之後的擴充性,這裡也使用 Spark ML Pipeline 的寫法。Spark ML 的 LogisticRegression
可以額外設置一個 weightCol
來調整不同 row 的權重。
ref:
https://spark.apache.org/docs/latest/ml-classification-regression.html
Evaluate the Model: Classification
因為 Logistic Regression 是二元分類模型,所以我們可以用 Spark ML 的 BinaryClassificationEvaluator
來評估結果。不過因為我們做的是推薦系統,真正在乎的是 Top N 的排序問題,所以這裡的 AUC 的數值參考一下就好。
ref:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
https://docs.databricks.com/spark/latest/mllib/binary-classification-mllib-pipelines.html
Generate Candidates
推薦系統的另外一個重要部分就是產生候選物品集,這裡我們使用以下幾種方式:
- ALS: 協同過濾的推薦
- Content-based: 基於內容的推薦
- Popularity: 基於熱門的推薦
不過因為這篇文章的主題是排序和特徵工程的 Machine Learning Pipeline,所以產生候選物品集的部分就不多說了,有興趣的人可以直接看底下連結的 source code 或是這個系列的其他文章。
ref:
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/recommenders/ALSRecommender.scala
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/recommenders/ContentRecommender.scala
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/recommenders/PopularityRecommender.scala
Predict the Ranking
把這些候選物品集丟給我們訓練好的 Logistic Regression 模型來排序。結果中的 probability
欄位的第 0 項表示結果為 0 的機率(negative)、第 1 項表示結果為 1 的機率(positive)。
Evaluate the Model: Ranking
最後我們使用 Information Retrieval 領域中用來評價排序能力的指標 NDCG (Normalized Discounted Cumulative Gain) 來評估排序的結果。Spark MLlib 有現成的 RankingMetrics
可以用,但是它只適用於 RDD-based 的 API,所以我們改寫成適合 DataFrame-based 的 Evaluator
。
ref:
https://github.com/vinta/albedo/blob/master/src/main/scala/ws/vinta/albedo/evaluators/RankingEvaluator.scala
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems
https://weekly.codetengu.com/issues/83#kOxuVxW