Feature Engineering 特徵工程中常見的方法

Feature Engineering 特徵工程中常見的方法

Feature Engineering 是把 raw data 轉換成 features 的整個過程的總稱。基本上特徵工程就是個手藝活,講求的是創造力。

本文不定期更新中。

Missing Value Imputation

最簡單暴力的做法當然就是直接 drop 掉那些含有缺失值的 rows。

針對 numerical 特徵的缺失值,可以用以下方式取代:

  • 0,缺點是可能會混淆其他本來就是 0 的數值
  • -999,用某個正常情況下不會出現的數值代替,但是選得不好可能會變成異常值,要特別對待
  • Mean,平均數
  • Median,中位數,跟平均數相比,不會被異常值干擾

針對 categorical 特徵的缺失值,可以用以下方式取代:

  • Mode,眾數,最常見的值
  • 改成 "Others" 之類的值

假設你要填補 age 這個特徵,然後你有其他例如 gender 這樣的特徵,你可以分別計算男性和女性的 age 的 mean、median 和 mode 來填補缺失值;更複雜一點的方式是,你可以把沒有缺失值的數據挑出來,用它們來訓練一個 regression 或 classification 模型,用這個模型來預測缺失值。

不過其實有些演算法是可以容許缺失值的,這時候可以新增一個 has_missing_value 欄位(稱為 NA indicator column)。

ref:
http://adataanalyst.com/machine-learning/comprehensive-guide-feature-engineering/
https://stats.stackexchange.com/questions/28860/why-adding-an-na-indicator-column-instead-of-value-imputation-for-randomforest

Outliers Detection

發現離群值最直觀的方式就是畫圖表,針對單一特徵可以使用 box plot;兩兩特徵則可以使用 scatter plot。

處置離群值的方式通常是直接刪除或是做變換(例如 log transformation 或 binning),當然你也可以套用處理缺失值的方式。

ref:
https://www.analyticsvidhya.com/blog/2016/01/guide-data-exploration/
https://www.douban.com/note/413022836/

Duplicate Entries Removal

duplicate 或 redundant 尤其指的是那些 features 都一樣,但是 target variable 卻不同的數據。

Feature Scaling 特徵縮放

Standardization 標準化

原始資料中,因為各個特徵的含義和單位不同,每個特徵的取值範圍可能會差異很大。例如某個二元特徵的範圍是 0 或 1,另一個價格特徵的範圍可能是 [0, 1000000],由於取值範圍相差過大導致了模型可能會更偏向於取值範圍較大的那個特徵。解決的辦法就是把各種不同 scale 的特徵轉換成同樣的 scale,稱為標準化或正規化。

狹義來說,標準化專門指的是透過計算 z-score,讓數據的 mean 為 0、 variance 為 1。

ref:
https://spark.apache.org/docs/latest/ml-features.html#standardscaler
http://scikit-learn.org/stable/modules/preprocessing.html#standardization-or-mean-removal-and-variance-scaling
https://www.quora.com/How-bad-is-it-to-standardize-dummy-variables

Normalization 歸一化、正規化

歸一化是指把每個樣本縮放到單位範數(每個樣本的範數為 1),適用於計算 dot product 或者兩個樣本之間的相似性。除了標準化、歸一化之外,其他還有透過最大、最小值,把數據的範圍縮放到 [0, 1] 或 [-1, 1] 的區間縮放法,不過這個方法容易受異常值的影響。

標準化是分別對單一特徵進行(針對 column);歸一化是對每個 observation 進行(針對 row)。

對 SVM、logistic regression 或其他使用 squared loss function 的演算法來說,需要 standardization;對 Vector Space Model 來說,需要 normalization;至於 tree-based 的演算法,基本上都不需要標準化或歸一化,它們對 scale 不敏感。

ref:
https://spark.apache.org/docs/latest/ml-features.html#normalizer
http://scikit-learn.org/stable/modules/preprocessing.html#normalization
https://www.qcloud.com/community/article/689521

Feature Transformation 特徵變換

以下適用 continuous 特徵:

Rounding

某些精度有到小數點後第 n 位的特徵,如果你其實不需要那麼精確,可以考慮 round(value * m)round(log(value)) 這樣的做法,甚至可以把 round 之後的數值當成 categorical 特徵。

confidence  round(confidence * 10)
0.9594      10
0.1254      1
0.1854      2
0.5454      5
0.3655      4

Log Transformation

因為 x 越大,log(x) 增長的速度就越慢,所以取 log 的意義是可以 compress 大數和 expand 小數,換句話說就是壓縮 "long tail" 和展開 "head"。假設 x 原本的範圍是 [100, 1000],log(x, 10) 之後的範圍就變成 [2, 3] 了。也常常使用 log(1 + x)log(x / (1 - x))

另外一種類似的做法是 square root 平方根或 cube root 立方根(可以用在負數)。

ref:
https://www.safaribooksonline.com/library/view/mastering-feature-engineering/9781491953235/ch02.html

Binarization 二值化

對數值型的數據設定一個 threshold,大於就賦值為 1、小於就賦值為 0。例如 score,如果你只關心「及格」或「不及格」,可以直接把成績對應到 1(score >= 60)和 0(score < 60)。或是你要做啤酒銷量分析,你可以新增一個 age >= 18 的特徵來標示出已成年。

你有一個 color 的 categorical 特徵,如果你不在乎實際上是什麼顏色的話,其實也可以改成 has_color

ref:
https://spark.apache.org/docs/latest/ml-features.html#binarizer

Binning

也稱為 bucketization。

age 這樣的特徵為例,你可以把所有年齡拆分成 n 段,0-20 歲、20-40 歲、40-60 歲等或是 0-18 歲、18-40 歲、40-70 歲等(等距或等量),然後把個別的年齡對應到某一段,假設 26 歲是對應到第二個 bucket,那新特徵的值就是 2。這種方式是人為地指定每個 bucket 的邊界值,還有另外一種拆分法是根據數據的分佈來拆,稱為 quantization 或 quantile binning,你只需要指定 bucket 的數量即可。

同樣的概念應用到其他地方,可以把 datetime 特徵拆分成上午、中午、下午和晚上;如果是 categorical 特徵,則可以先 SELECT count() ... GROUP BY,然後把出現次數小於某個 threshold 的值改成 "Other" 之類的。或者是你有一個 occupation 特徵,如果你其實不需要非常準確的職業資訊的話,可以把 "Web Developer"、"iOS Developer" 或 "DBA" 這些個別的資料都改成 "Software Engineer"。

binarization 和 binning 都是對 continuous 特徵做 discretization 離散化,增強模型的非線性泛化能力。

ref:
https://spark.apache.org/docs/latest/ml-features.html#bucketizer
https://spark.apache.org/docs/latest/ml-features.html#quantilediscretizer
https://github.com/collectivemedia/spark-ext#optimal-binning
https://www.qcloud.com/community/article/689521

以下適用 categorical 特徵:

ref:
https://en.wikipedia.org/wiki/Categorical_variable
https://www.safaribooksonline.com/library/view/introduction-to-machine/9781449369880/ch04.html

Integer Encoding

也稱為 label encoding。

把每個 category 對應到數字,一種做法是隨機對應到 0, 1, 2, 3, 4 等數字;另外一種做法是依照該值出現的頻率大小的順序來給值,例如最常出現的值給 0,依序給 1, 2, 3 等等。如果是針對一些在某種程度上有次序的 categorical 特徵(稱為 ordinal),例如「鑽石會員」「白金會員」「黃金會員」「普通會員」,直接 mapping 成數字可能沒什麼問題,但是如果是類似 colorcity 這樣的沒有明顯大小的特徵的話,還是用 one-hot encoding 比較合適。不過如果用的是 tree-based 的演算法就無所謂了。

有些 categorical 特徵也可能會用數字表示(例如 id),跟 continuous 特徵的差別是,數值的差異或大小對 categorical 特徵來說沒有太大的意義。

ref:
http://breezedeus.github.io/2014/11/15/breezedeus-feature-processing.html
http://phunters.lofter.com/post/86d56_194e956

One-hot Encoding (OHE)

如果某個特徵有 m 種值(例如 Taipei, Beijing, Tokyo),那它 one-hot encode 之後就會變成長度為 m 的向量:

city    city_Taipei city_Beijing city_tokyo
Taipei  1           0            0
Beijing 0           1            0
Tokyo   0           0            1

你也可以改用 Dummy coding,這樣就只需要產生長度為 m -1 的向量:

city    city_Taipei city_Beijing
Taipei  1           0
Beijing 0           1
Tokyo   0           0

OHE 的缺點是容易造成特徵的維度大幅增加和沒辦法處理之前沒見過的值。

ref:
http://scikit-learn.org/stable/modules/preprocessing.html#preprocessing-categorical-features
https://blog.myyellowroad.com/using-categorical-data-in-machine-learning-with-python-from-dummy-variables-to-deep-category-66041f734512

Bin-counting

例如在 Computational Advertising 中,如果你有針對每個 user 的「廣告曝光數(包含點擊和未點擊)」和「廣告點擊數」,你就可以算出每個 user 的「點擊率」,然後用這個機率來表示每個 user,反之也可以對 ad id 使用類似的做法。

ad_id   ad_views  ad_clicks  ad_ctr
412533  18339     1355       0.074
423334  335       12         0.036
345664  1244      132        0.106
349833  35387     1244       0.035

ref:
https://blogs.technet.microsoft.com/machinelearning/2015/02/17/big-learning-made-easy-with-counts/

換個思路,如果你有一個 brand 的特徵,然後你可以從 user 的購買記錄中找出購買 A 品牌的人,有 70% 的人會購買 B 品牌、有 40% 的人會購買 C 品牌;購買 D 品牌的人,有 10% 的人會購買 A 品牌和 E 品牌,你可以每個品牌表示成這樣:

brand  A    B    C    D    E
A      1.0  0.7  0.4  0.0  0.0
B      ...
C      ...
D      0.1  0.0  0.0  1.0  0.1
E      ...

ref:
http://phunters.lofter.com/post/86d56_194e956

LabelCount Encoding

類似 Bin-cunting 的做法,一樣是利用現有的 count 或其他統計上的資料,差別在於 LabelCount Encoding 最後用的是次序而不是數值本身。優點是對異常值不敏感。

ad_id   ad_clicks  ad_rank
412533  1355       1
423334  12         4
345664  132        3
349833  1244       2

ref:
https://www.slideshare.net/gabrielspmoreira/feature-engineering-getting-most-out-of-data-for-predictive-models-tdc-2017/47

Count Vectorization

除了可以用在 text 特徵之外,如果你有 comma-seperated 的 categorical 特徵也可以使用這個方法。例如電影類型 genre,裡頭的值長這樣 Action,Sci-Fi,Drama,就可以先用 RegexTokenizer 轉成 Array("action", "sci-fi", "drama"),再用 CountVectorizer 轉成 vector。

ref:
https://spark.apache.org/docs/latest/ml-features.html#countvectorizer

Feature Hashing

以 user id 為例,透過一個 hash function 把每一個 user id 映射到 (hashed1_, hashed_2, ..., hashed_m) 的某個值。指定 m << user id 的取值範圍,所以缺點是會有 collision(如果你的 model 足夠 robust,倒也是可以不管),優點是可以良好地處理之前沒見過的值和罕見的值。當然不只可以 hash 單一值,也可以 hash 一個 vector。

你可以把 feature hashing 表示為單一欄位的數值(例如 2)或是類似 one-hot encoding 那樣的多欄位的 binary 表示法(例如 [0, 0, 1])。

import hashlib

def hash_func(s, n_bins=100000):
    s = s.encode('utf-8')
    return int(hashlib.md5(s).hexdigest(), 16) % (n_bins - 1) + 1

print(hash_func('some categorical value'))

ref:
https://github.com/apache/spark/pull/18513
https://spark.apache.org/docs/latest/ml-features.html#feature-transformation
https://www.slideshare.net/gabrielspmoreira/feature-engineering-getting-most-out-of-data-for-predictive-models-tdc-2017/42

Mean Encoding

ref:
https://zhuanlan.zhihu.com/p/26308272

Category Embedding

ref:
https://arxiv.org/abs/1604.06737
https://www.slideshare.net/HJvanVeen/feature-engineering-72376750/17
https://blog.myyellowroad.com/using-categorical-data-in-machine-learning-with-python-from-dummy-variables-to-deep-category-42fd0a43b009

User Profile 用戶畫像

使用用戶畫像來表示每個 user id,例如用戶的年齡、性別、職業、收入、居住地、偏好的各種 tag 等,把每個 user 表示成一個 feature vector。除了單一維度的特徵之外,也可以建立「用戶聽過的歌都是哪些曲風」、「用戶(30 天內)瀏覽過的文章都是什麼分類,以 TF-IDF 的方式表達。或者是把用戶所有喜歡文章對應的向量的平均值作為此用戶的 profile。比如某個用戶經常關注與推薦系統有關的文章,那麼他的 profile 中 "CB"、"CF" 和 "推薦系統" 對應的權重值就會較高。

ref:
https://mp.weixin.qq.com/s/w87-dyG9Ap9xJ_HZu0Qn-w
https://medium.com/unstructured/how-feature-engineering-can-help-you-do-well-in-a-kaggle-competition-part-i-9cc9a883514d

Rare Categorical Variables

先計算好每一種 category 的數量,然後把小於某個 threshold 的 category 都改成 "Others" 之類的值。或是使用 clustering 演算法來達到同樣的目的。你也可以直接建立一個新的 binary feature 叫做 rare,要來標示那些相對少見的資料點。

Unseen Categorical Variables

以 Spark ML 為例,當你用 training set 的資料 fit 了一個 StringIndexer(和 OneHotEncoder),把它拿去用在 test set 上時,有一定的機率你會遇到某些 categorical 特徵的值只在 test set 出現,所以對只見過 training set 的 transformer 來說,這些就是所謂的 unseen values。

對付 unseen values 通常有幾種做法:

  • 用整個 training set + test set 來編碼 categorical 特徵
  • 直接捨棄含有 unseen values 的那筆資料
  • 把 unseen values 改成 "Others" 之類的已知值。StringIndexer.setHandleInvalid("keep") 基本上就是這種做法

如果採用第一種方式,一但你把這個 transformer 拿到 production 去用時,無可避免地還是會遇到 unseen values。不過通常線上的 feature engineering 會有別的做法,例如事先把 user 或 item 的各項特徵都算好(定期更新或是 data 產生的時候觸發),然後以 id 為 key 存進 Redis 之類的 NoSQL 裡,model 要用的時候直接用 user id / item id 拿到處理好的 feature vector。

ref:
https://stackoverflow.com/questions/34681534/spark-ml-stringindexer-handling-unseen-labels

Large Categorical Variables

針對那種非常大的 categorical 特徵(例如 id 類的特徵),如果你用的是 logistic regression,其實可以硬上 one-hot encoding。不然就是利用上面提到的 feature hashing 或 bin counting 等方式;如果是 GBDT 的話,甚至可以直接用 id 硬上,只要 tree 足夠多。

ref:
https://www.zhihu.com/question/34819617

Feature Construction 特徵建構

特徵構建指的是從原有的特徵中,人工地創造出新的特徵,通常用來解決一般的線性模型沒辦法學到非線性特徵的問題。其中一個重點可能是能不能夠過某些辦法,在特徵中加入某些「額外的資訊」,雖然也得小心數據偏見的問題。

如果你有很多 user 購物的資料,除了可以 aggregate 得到 total spend 這樣的 feature 之外,也可以變換一下,變成 spend in last weekspend in last monthspend in last year 這種可以表示「趨勢」的特徵。

範例:

  • author_avg_page_view: 該文章作者的所有文章的平均瀏覽數
  • user_visited_days_since_doc_published: 該文章發布到該用戶訪問經過了多少天
  • user_history_doc_sim_categories: 用戶讀過的所有文章的分類和該篇文章的分類的 TF-IDF 的相似度
  • user_history_doc_sim_topics: 用戶讀過的所有文章的內文和該篇文章的內文的 TF-IDF 的相似度

ref:
https://medium.com/unstructured/how-feature-engineering-can-help-you-do-well-in-a-kaggle-competition-part-i-9cc9a883514d
https://www.safaribooksonline.com/library/view/large-scale-machine/9781785888748/ch04s02.html
https://www.slideshare.net/HJvanVeen/feature-engineering-72376750/23

Temporal Features 時間特徵

對於 date / time 類型的資料,除了轉換成 timestamp 和取出 day、month 和 year 做成新的欄位之外,也可以對 hour 做 binning(分成上午、中午、晚上之類的)或是對 day 做 binning(分成工作日、週末);或是想辦法查出該日期當天的天氣、節日或活動等訊息,例如 is_national_holidayhas_sport_events

更進一步,用 datetime 類的資料通常也可以做成 spend_hours_last_weekspend_money_last_week 這種可以用來表示「趨勢」的特徵。

Text Features 文字特徵

ref:
https://www.slideshare.net/HJvanVeen/feature-engineering-72376750/57

Spatial Features 地理特徵

如果你有 cityaddress 等特徵,可以新建出 latitudelongitude 兩個 features(當然你得透過外部的 API 或資料來源才做得到),再組合出 median_income_within_2_miles 這樣的特徵。

ref:
https://www.slideshare.net/HJvanVeen/feature-engineering-72376750/47

Cyclical Features

ref:
http://blog.davidkaleko.com/feature-engineering-cyclical-features.html

Features Interaction 特徵交互

假設你有 AB 兩個 continuous 特徵,你可以用 A + BA - BA * BA / B 之類的方式建立新的特徵。例如 house_age_at_purchase = house_built_date - house_purchase_date 或是 click_through_rate = n_clicks / n_impressions

還有一種類似的作法叫 Polynomial Expansion 多項式展開,當 degree 為 2 時,可以把 (x, y) 兩個特徵變成 (x, x * x, y, x * y, y * y) 五個特徵。

ref:
https://spark.apache.org/docs/latest/ml-features.html#polynomialexpansion
https://elitedatascience.com/feature-engineering-best-practices

Feature Combination 特徵組合

也稱為特徵交叉。

特徵組合主要是針對 categorical 特徵,特徵交互則是適用於 continuous 特徵。但是兩者的概念是差不多的,就是把兩個以上的特徵透過某種方式結合在一起,變成新的特徵。通常用來解決一般的線性模型沒辦法學到非線性特徵的問題。

假設有 genderwealth 兩個特徵,分別有 2 和 3 種取值,最簡單的方式就是直接 string concatenation 組合出一個新的特徵 gender_wealth,共有 2 x 3 = 6 種取值。因為是 categorical 特徵,可以直接對 gender_wealth 使用 StringIndexerOneHotEncoder。你當然也可以一起組合 continuous 和 categorical 特徵,例如 age_wealth 這樣的特徵,只是 vector 裡的值就不是 0 1 而是 age 本身了。

假設 C 是 categorical 特徵,N 是 continuous 特徵,以下有幾種有意義的組合:

  • median(N) GROUP BY C 中位數
  • mean(N) GROUP BY C 算術平均數
  • mode(N) GROUP BY C 眾數
  • min(N) GROUP BY C 最小值
  • max(N) GROUP BY C 最大值
  • std(N) GROUP BY C 標準差
  • var(N) GROUP BY C 方差
  • N - median(N) GROUP BY C
user_id  age  gender  wealth  gender_wealth  gender_wealth_ohe   age_wealth
1        56   male    rich    male_rich      [1, 0, 0, 0, 0, 0]  [56, 0, 0]
2        30   male    middle  male_middle    [0, 1, 0, 0, 0, 0]  [0, 30, 0]
3        19   female  rich    female_rich    [0, 0, 0, 1, 0, 0]  [19, 0, 0]
4        62   female  poor    female_poor    [0, 0, 0, 0, 0, 1]  [0, 0, 62]
5        78   male    poor    male_poor      [0, 0, 1, 0, 0, 0]  [0, 0, 78]
6        34   female  middle  female_middle  [0, 0, 0, 0, 1, 0]  [0, 34, 0]

ref:
http://breezedeus.github.io/2014/11/15/breezedeus-feature-processing.html
http://phunters.lofter.com/post/86d56_194e956
https://zhuanlan.zhihu.com/p/26444240
http://blog.csdn.net/mytestmy/article/details/40933235

Feature Extraction 特徵提取

通常就是指 dimensionality reduction。

  • Principal Component Analysis (PCA)
  • Latent Dirichlet Allocation (LDA)
  • Latent Semantic Analysis (LSA)

Feature Selection 特徵選擇

特徵選擇是指透過某些方法自動地從所有的特徵中挑選出有用的特徵。

ref:
http://scikit-learn.org/stable/modules/feature_selection.html

Filter Method

採用某一種評估指標(發散性、相關性或 Information Gain 等),單獨地衡量個別特徵跟 target variable 之間的關係,常用的方法有 Chi Square Test(卡方檢驗)。這種特徵選擇方式沒有任何模型的參與。

以相關性來說,也不見得跟 target variable 的相關性越高就越好,

ref:
https://spark.apache.org/docs/latest/ml-features.html#chisqselector
http://files.cnblogs.com/files/XBWer/%E6%9C%BA%E5%99%A8%E5%AD%A6%E4%B9%A0%E3%81%AE%E7%89%B9%E5%BE%81.pdf

Wrapper Method

會採用某個模型來預測你的 target variable,把特徵選擇想成是一個組合優化的問題,想辦法找出一組特徵子集能夠讓模型的評估結果最好。缺點是太耗時間了,實務上不常用。

ref:
http://www.cnblogs.com/heaad/archive/2011/01/02/1924088.html

Embedded Method

通常會採用一個會為特徵賦予 coefficients 或 importances 的演算法,例如 Logistic Regression(特別是使用 L1 penalty)或 GBDT,直接用權重或重要性對所有特徵排序,然後取前 n 個作為特徵子集。

ref:
http://scikit-learn.org/stable/modules/feature_selection.html#feature-selection-using-selectfrommodel
https://www.zhihu.com/question/28641663

Feature Learning 特徵學習

也稱為 Representation Learning 或 Automated Feature Engineering。

  • GBDT
  • Neural Network: Restricted Boltzmann Machines
  • Deep Learning: Autoencoder

ref:
https://zhuanlan.zhihu.com/p/26444240

Data Leakage 數據洩漏

就是指你在 features 中直接或間接地加入了跟 target variable 有關的數據。

ref:
https://zhuanlan.zhihu.com/p/26444240

Target Engineering

雖然不能算是 feature engineering 的一部分,但是其實你也可以對 target variable / label(就是你的模型要預測的那個值)做點變換。例如 log(y + 1)exp(y) - 1

碼天狗週刊 第 99 期 @vinta - Apache Spark, Python, Machine Learning, Feature Engineering, Testing, Linux

碼天狗週刊 第 99 期 @vinta - Apache Spark, Python, Machine Learning, Feature Engineering, Testing, Linux

本文同步發表於 CodeTengu Weekly - Issue 99

Spark SQL cookbook (Python)

最近在為 StreetVoice 開發一個音樂的推薦系統,採用 Apache Spark,不過因為老是忘記 DataFrame 某某功能的用法,所以就乾脆仿效 O'Reilly 著名的 Cookbook 系列,幫自己寫了一篇 Spark SQL cookbook,複習、速查兩相宜啊。

因為 Spark 支援 Scala、Java、Python 和 R,一開始是打算用 Scala 來練練功的,不過畢竟是公司的專案,考慮到後續其他人的參與和維護,好像還是採用一個團隊成員都熟悉的語言比較好吶(成熟的大人.jpg)。

延伸閱讀:

How to Size Executors, Cores and Memory for a Spark application running in memory

在使用 spark-submit 的時候可以指定 --driver-memory--executor-memory--executor-cores--num-executors 等參數來配置你的 Spark app 可以使用的運算資源,這篇文章指出了幾個需要注意的地方以及 One executor per core 和 One executor per node 這兩種做法會有什麼問題。

P.S. 現在 Spark 除了 Standalone 和 YARN 模式之外,也開始實驗性地支援 Kubernetes 了:apache-spark-on-k8s,看樣子 k8s 真的有一統江湖之勢了啊。

Mastering Feature Engineering

整個推薦系統的 pipeline 可以很粗略地分成 candidate generation 和 ranking 兩個部分,而 ranking 常用的模型之一就是簡單粗暴的 Logistic Regression(通常還會搭配 GBDT 或 Deep Neural Networks)。因為要用 LR 需要大量的 Feature Engineering,所以我就特地找了一本專門在講特徵工程的書,上週末去剪頭髮的時候終於讀完,正好可以推薦給大家。

不過這本書講的是比較基礎的部分(不要想一步登天嘛),例如針對數值特徵的 Binning 或標準化、針對文字特徵的 TF-IDF 和針對類別特徵的 One-hot encoding 或 Feature hashing,對創建出非線性特徵的 Feature Construction 則沒有什麼著墨。可以搭配前幾期推薦過的「机器学习中的数据清洗与特征处理综述」一起看。

Write Explicit Tests

Sometimes, normal programming good practices don’t apply to software tests. DRY in particular I don’t subscribe to for test code, because I want my tests to read like a story. - Kent Beck 如是說

你減少了重複,但是卻帶來了耦合。寫程式真的很難啊。

Strace - The SysAdmin's Microscope

strace 是個可以用來觀測某個 script 或 process 在 system call 這個層面到底做了哪些事的指令,是 troubleshooting 的好幫手,尤其是用來解決在 Linux 上大家喜聞樂見的「幹你娘為什麼 xxx 跑不起來?!(20 分鐘之後)噢我權限設錯了」的問題。

延伸閱讀:

Build a recommender system with Spark: Implicit ALS

Build a recommender system with Spark: Implicit ALS

在這個系列的文章裡,我們將使用 Apache Spark、XGBoost、Elasticsearch 和 MySQL 等工具來搭建一個推薦系統的 Machine Learning Pipeline。推薦系統的組成可以粗略地分成 Candidate Generation 和 Ranking 兩個部分,前者是針對用戶產生候選物品集,常用的方法有 Collaborative Filtering、Content-based、標籤配對、熱門排行或人工精選等;後者則是對這些候選物品排序,以 Top N 的方式呈現最終的推薦結果,常用的方法有 Logistic Regression。

在本篇文章中,我們將以 Candidate Generation 階段常用的方法之一:Collaborative Filtering 協同過濾演算法為例,利用 Apache Spark 的 ALS (Alternating Least Squares) 模型建立一個 GitHub repositories 的推薦系統,以用戶對 repo 的打星紀錄作為訓練數據,推薦出用戶可能會感興趣的其他 repo 作為候選物品集。

完整的程式碼可以在 https://github.com/vinta/albedo 找到。

系列文章:

Submit the Application

因為需要使用 JDBC 讀取 MySQL 資料庫,必須安裝 MySQL driver,可以透過 --packages "mysql:mysql-connector-java:5.1.41" 參數在 cluster 的每一台機器上安裝需要的 Java packages。

$ spark-submit \
--packages "com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41" \
--master spark://YOUR_SPARK_MASTER:7077 \
--py-files deps.zip \
train_als.py -u vinta

ref:
https://spark.apache.org/docs/latest/submitting-applications.html

Load Data

讀取來自 MySQL 資料庫的數據。你可以使用 predicates 參數來指定 WHERE 條件,雖然嚴格來說這個參數是用來控制 partition 數量的,一個條件就是一個 partition。

假設 app_repostarring 的欄位如下:

CREATE TABLE `app_repostarring` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `from_user_id` int(11) NOT NULL,
  `from_username` varchar(39) NOT NULL,
  `repo_owner_id` int(11) NOT NULL,
  `repo_owner_username` varchar(39) NOT NULL,
  `repo_owner_type` varchar(16) NOT NULL,
  `repo_id` int(11) NOT NULL,
  `repo_name` varchar(100) NOT NULL,
  `repo_full_name` varchar(140) NOT NULL,
  `repo_description` varchar(191) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `repo_language` varchar(32) NOT NULL,
  `repo_created_at` datetime(6) NOT NULL,
  `repo_updated_at` datetime(6) NOT NULL,
  `starred_at` datetime(6) NOT NULL,
  `stargazers_count` int(11) NOT NULL,
  `forks_count` int(11) NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `from_user_id_repo_id` (`full_name`, `repo_id`)
);
def loadRawData():
    url = 'jdbc:mysql://127.0.0.1:3306/albedo?user=root&password=123&verifyServerCertificate=false&useSSL=false'
    properties = {'driver': 'com.mysql.jdbc.Driver'}
    rawDF = spark.read.jdbc(url, table='app_repostarring', properties=properties)
    return rawDF

rawDF = loadRawData()

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=jdbc#pyspark.sql.DataFrameReader.jdbc
http://www.gatorsmile.io/numpartitionsinjdbc/

Preprocess Data

Format Data

把 raw data 整理成 user,item,rating,starred_at 這樣的格式。starred_at 只有評價 model 時有用來排序,訓練 model 時並沒有用到,因為 Spark 的 ALS 沒辦法輕易地整合 side information。

from pyspark.ml import Transformer

class RatingBuilder(Transformer):

    def _transform(self, rawDF):
        ratingDF = rawDF \
            .selectExpr('from_user_id AS user', 'repo_id AS item', '1 AS rating', 'starred_at') \
            .orderBy('user', F.col('starred_at').desc())
        return ratingDF

ratingBuilder = RatingBuilder()
ratingDF = ratingBuilder.transform(rawDF)
ratingDF.cache()

ref:
http://blog.ethanrosenthal.com/2016/11/07/implicit-mf-part-2/

Inspect Data

import pyspark.sql.functions as F

ratingDF.rdd.getNumPartitions()
# 200

ratingDF.agg(F.count('rating'), F.countDistinct('user'), F.countDistinct('item')).show()
# +-------------+--------------------+--------------------+
# |count(rating)|count(DISTINCT user)|count(DISTINCT item)|
# +-------------+--------------------+--------------------+
# |      3121629|               10483|              551216|
# +-------------+--------------------+--------------------+

stargazersCountDF = ratingDF \
    .groupBy('item') \
    .agg(F.count('user').alias('stargazers_count')) \
    .orderBy('stargazers_count', ascending=False)
stargazersCountDF.show(10)
# +--------+----------------+
# |    item|stargazers_count|
# +--------+----------------+
# | 2126244|            2211|
# |10270250|            1683|
# |  943149|            1605|
# |  291137|            1567|
# |13491895|            1526|
# | 9384267|            1480|
# | 3544424|            1468|
# | 7691631|            1441|
# |29028775|            1427|
# | 1334369|            1399|
# +--------+----------------+

starredCountDF = ratingDF \
    .groupBy('user') \
    .agg(F.count('item').alias('starred_count')) \
    .orderBy('starred_count', ascending=False)
starredCountDF.show(10)
# +-------+-------------+
# |   user|starred_count|
# +-------+-------------+
# |3947125|         8947|
# |5527642|         7978|
# | 446613|         7860|
# | 627410|         7800|
# |  13998|         6334|
# |2467194|         6327|
# |  63402|         6034|
# |2005841|         6024|
# |5073946|         5980|
# |   2296|         5862|
# +-------+-------------+

Clean Data

你可以過濾掉那些太少 user 打星的 item 和打星了太少 item 的 user,提昇矩陣的稠密度。這個現象也正好是 Cold Start 的問題,你就是沒有足夠多的關於這些 item 和 user 的數據(可以考慮使用 content-based 的推薦方式)。除此之外,如果你的推薦系統所推薦的 item 只有非常少人打星,即便你完美地挖掘了長尾效應,這樣的推薦結果給用戶的「第一印象」可能也不會太好(這可能決定了他要不要繼續使用這個系統或是他要不要真的去嘗試那個你推薦給他的東西)。

你也可以選擇要不要過濾掉那些超多人打星的 item 和打星了超多 item 的 user。如果某些 item 有超過八、九成的 user 都打星了,對於這麼熱門的 item,可能也沒有推薦的必要了,因為其他 user 早晚也會自己發現的;如果有少數的 user 幾乎打星了一半以上的 item,這些 user 可能是屬於某種 web crawler 的用途或是這些 user 就是那種看到什麼就打星什麼的人,無論是哪一種,他們可能都不是你想要 modeling 的對象,可以考慮從 dataset 中拿掉。

實務上,如果你有關於 user 或 item 的黑名單,例如一些 SPAM 帳號或 NSFW 的內容等,也可以在這個步驟把它們過濾掉。

from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import Param
import pyspark.sql.functions as F

class DataCleaner(Transformer):

    @keyword_only
    def __init__(self, minItemStargazersCount=None, maxItemStargazersCount=None, minUserStarredCount=None, maxUserStarredCount=None):
        super(DataCleaner, self).__init__()
        self.minItemStargazersCount = Param(self, 'minItemStargazersCount', '移除 stargazer 數低於這個數字的 item')
        self.maxItemStargazersCount = Param(self, 'maxItemStargazersCount', '移除 stargazer 數超過這個數字的 item')
        self.minUserStarredCount = Param(self, 'minUserStarredCount', '移除 starred repo 數低於這個數字的 user')
        self.maxUserStarredCount = Param(self, 'maxUserStarredCount', '移除 starred repo 數超過這個數字的 user')
        self._setDefault(minItemStargazersCount=1, maxItemStargazersCount=50000, minUserStarredCount=1, maxUserStarredCount=50000)
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, minItemStargazersCount=None, maxItemStargazersCount=None, minUserStarredCount=None, maxUserStarredCount=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def setMinItemStargazersCount(self, value):
        self._paramMap[self.minItemStargazersCount] = value
        return self

    def getMinItemStargazersCount(self):
        return self.getOrDefault(self.minItemStargazersCount)

    def setMaxItemStargazersCount(self, value):
        self._paramMap[self.maxItemStargazersCount] = value
        return self

    def getMaxItemStargazersCount(self):
        return self.getOrDefault(self.maxItemStargazersCount)

    def setMinUserStarredCount(self, value):
        self._paramMap[self.minUserStarredCount] = value
        return self

    def getMinUserStarredCount(self):
        return self.getOrDefault(self.minUserStarredCount)

    def setMaxUserStarredCount(self, value):
        self._paramMap[self.maxUserStarredCount] = value
        return self

    def getMaxUserStarredCount(self):
        return self.getOrDefault(self.maxUserStarredCount)

    def _transform(self, ratingDF):
        minItemStargazersCount = self.getMinItemStargazersCount()
        maxItemStargazersCount = self.getMaxItemStargazersCount()
        minUserStarredCount = self.getMinUserStarredCount()
        maxUserStarredCount = self.getMaxUserStarredCount()

        toKeepItemsDF = ratingDF \
            .groupBy('item') \
            .agg(F.count('user').alias('stargazers_count')) \
            .where('stargazers_count >= {0} AND stargazers_count <= {1}'.format(minItemStargazersCount, maxItemStargazersCount)) \
            .orderBy('stargazers_count', ascending=False) \
            .select('item', 'stargazers_count')
        temp1DF = ratingDF.join(toKeepItemsDF, 'item', 'inner')

        toKeepUsersDF = temp1DF \
            .groupBy('user') \
            .agg(F.count('item').alias('starred_count')) \
            .where('starred_count >= {0} AND starred_count <= {1}'.format(minUserStarredCount, maxUserStarredCount)) \
            .orderBy('starred_count', ascending=False) \
            .select('user', 'starred_count')
        temp2DF = temp1DF.join(toKeepUsersDF, 'user', 'inner')

        cleanDF = temp2DF.select('user', 'item', 'rating', 'starred_at')
        return cleanDF

dataCleaner = DataCleaner(
    minItemStargazersCount=2,
    maxItemStargazersCount=4000,
    minUserStarredCount=2,
    maxUserStarredCount=5000
)
cleanDF = dataCleaner.transform(ratingDF)

cleanDF.agg(F.count('rating'), F.countDistinct('user'), F.countDistinct('item')).show()
# +-------------+--------------------+--------------------+
# |count(rating)|count(DISTINCT user)|count(DISTINCT item)|
# +-------------+--------------------+--------------------+
# |      2761118|               10472|              245626|
# +-------------+--------------------+--------------------+

Generate Negative Samples

對 implicit feedback 的 ALS 來說,手動加入負樣本(Rui = 0 的樣本)是沒有意義的,因為 missing value / non-observed value 對該演算法來說本來就是 0,表示用戶確實沒有對該物品做出行為,也就是 Pui = 0 沒有偏好,所以 Cui = 1 + alpha x 0 置信度也會比其他正樣本低。不過因為 Spark ML 的 ALS 只會計算 Rui > 0 的項目,所以即便你手動加入了 Rui = 0 或 Rui = -1 的負樣本,對整個模型其實沒有影響。

雖然沒有負樣本你就不能算 area under ROC curve 或是 area under Precision-Recall curve 等 binary classifier 用的指標,不過你可以改用 Learning to rank 的評估方式,例如 NDCG 或 Mean Average Precision 等。但是 ALS 的 loss function 也沒辦法直接優化 NDCG 這樣的指標就是了。

ref:
https://vinta.ws/code/generate-negative-samples-for-recommender-system.html

Split Data

因為 Matrix Factorization 需要考慮每個 user-item pair,如果你餵給 model 它沒見過的資料,它就沒辦法進行推薦(冷啟動問題)。只要 user 或 item 其中之一不存在於 dataset 裡,ALS model 所輸出的 prediction 值就會是 NaN。所以應該盡量保持每個 user 和 item 都出現在 training set 和 testing set 裡,例如隨機挑出每個 user 的任意 n 個或 n 比例的評分作為 test set,剩下的評分當作 training set(俗稱 leave-n-out)。如果使用 Machine Learning 中常見的 holdout 方式,隨機地把所有 data point 分散到 training set 和 test set(例如 df.randomSplit([0.7, 0.3])),會有很高的機率造成部分 user 或 item 只出現在其中一組 dataset 裡。

ref:
https://jessesw.com/Rec-System/
http://blog.ethanrosenthal.com/2016/10/19/implicit-mf-part-1/

從 LibRec 的文件上也可以發現還有許多拆分數據的方式,例如:

  • 基于 Ratio 的分类方法为通过给定的比例来将数据分为两部分。这个分类过程可以在所有数据中进行随机分类,也可以在用户或者物品维度上进行分类。当有时间的特征时,可以根据时间顺序留出最后一定比例的数据来进行测试。
  • LooCV 的分割方法为 leave-one-user/item/rating-out,也就是随机选取每个 user 的任意一个 item 或者每个 item 的任意一个 user 作为测试数据,余下的数据来作为训练数据。在实现中实现了基于 User 和基于 Item 的多种分类方式。
  • GivenN 分割方法是指为每个用户留出指定数目 N 的数据来作为测试用例,余下的样本作为训练数据。
  • KCV 即 K 折交叉验证。将数据分割为 K 份,在每次执行时选择其中一份作为测试数据,余下的数据作为训练数据,共执行 K 次。综合 K 次的训练结果来对推荐算法的性能进行评估。

ref:
https://www.librec.net/dokuwiki/doku.php?id=DataModel_zh#splitter

這裡我們用 sampleBy() 簡單地寫了一個根據 user 來隨機劃分 item 到 training set 和 test set 的方法。

def randomSplitByUser(df, weights, seed=None):
    trainingRation = weights[0]
    fractions = {row['user']: trainingRation for row in df.select('user').distinct().collect()}
    training = df.sampleBy('user', fractions, seed)
    testRDD = df.rdd.subtract(training.rdd)
    test = spark.createDataFrame(testRDD, df.schema)
    return training, test

training, test = randomSplitByUser(ratingDF, weights=[0.7, 0.3])

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.sampleBy

Train the Model

from pyspark.ml.recommendation import ALS

als = ALS(implicitPrefs=True, seed=42) \
    .setRank(50) \
    .setMaxIter(22) \
    .setRegParam(0.5) \
    .setAlpha(40)

alsModel = als.fit(training)

# 這些就是訓練出來的 user 和 item 的 Latent Factors
alsModel.userFactors.show()
alsModel.itemFactors.show()

ref:
https://spark.apache.org/docs/latest/ml-collaborative-filtering.html
https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS

Predict Preferences

from pyspark.ml import Transformer

predictedDF = alsModel.transform(testing)

class PredictionProcessor(Transformer):

    def _transform(self, predictedDF):
        nonNullDF = predictedDF.dropna(subset=['prediction', ])
        predictionDF = nonNullDF.withColumn('prediction', nonNullDF['prediction'].cast('double'))
        return predictionDF

# 刪掉那些 NaN 的數據
predictionProcessor = PredictionProcessor()
predictionDF = predictionProcessor.transform(predictedDF)

Evaluate the Model

因為 Spark ML 沒有提供給 DataFrame 用的 ranking evaluator,我們只好自己寫一個,但是內部還是使用 Spark MLlib 的 RankingMetrics。不過這個只是 offline 的評估方式而已,等到要實際上線的時候可能還需要做 A/B testing。

from pyspark import keyword_only
from pyspark.ml.evaluation import Evaluator
from pyspark.ml.param.shared import Param
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
import pyspark.sql.functions as F

class RankingEvaluator(Evaluator):

    @keyword_only
    def __init__(self, k=None):
        super(RankingEvaluator, self).__init__()
        self.k = Param(self, 'k', 'Top K')
        self._setDefault(k=30)
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, k=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def isLargerBetter(self):
        return True

    def setK(self, value):
        self._paramMap[self.k] = value
        return self

    def getK(self):
        return self.getOrDefault(self.k)

    def _evaluate(self, predictedDF):
        k = self.getK()

        predictedDF.show()

        windowSpec = Window.partitionBy('user').orderBy(col('prediction').desc())
        perUserPredictedItemsDF = predictedDF \
            .select('user', 'item', 'prediction', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        windowSpec = Window.partitionBy('user').orderBy(col('starred_at').desc())
        perUserActualItemsDF = predictedDF \
            .select('user', 'item', 'starred_at', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        perUserItemsRDD = perUserPredictedItemsDF.join(F.broadcast(perUserActualItemsDF), 'user', 'inner') \
            .rdd \
            .map(lambda row: (row[1], row[2]))

        if perUserItemsRDD.isEmpty():
            return 0.0

        rankingMetrics = RankingMetrics(perUserItemsRDD)
        metric = rankingMetrics.ndcgAt(k)
        return metric

k = 30
rankingEvaluator = RankingEvaluator(k=k)
ndcg = rankingEvaluator.evaluate(predictionDF)
print('NDCG', ndcg)

ref:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html

Recommend Items

實際感受一下推薦系統的效果如何。這裡是直接把結果 print 出來,而沒有把推薦結果儲存到資料庫。不過通常不會直接就把推薦系統輸出的東西展示給用戶,會先經過一些過濾、排序和產生推薦理由等等的步驟,或是加入一些人為的規則,比如說強制插入廣告、最近主打的商品或是過濾掉那些很多人點擊但是其實質量並不怎麼樣的東西。當然也有可能會把這個推薦系統的輸出作為其他機器學習 model 的輸入。

ref:
https://www.zhihu.com/question/28247353

def recommendItems(rawDF, alsModel, username, topN=30, excludeKnownItems=False):
    userID = rawDF \
        .where('from_username = "{0}"'.format(username)) \
        .select('from_user_id') \
        .take(1)[0]['from_user_id']

    userItemsDF = alsModel \
        .itemFactors. \
        selectExpr('{0} AS user'.format(userID), 'id AS item')
    if excludeKnownItems:
        userKnownItemsDF = rawDF \
            .where('from_user_id = {0}'.format(userID)) \
            .selectExpr('repo_id AS item')
        userItemsDF = userItemsDF.join(userKnownItemsDF, 'item', 'left_anti')

    userPredictedDF = alsModel \
        .transform(userItemsDF) \
        .select('item', 'prediction') \
        .orderBy('prediction', ascending=False) \
        .limit(topN)

    repoDF = rawDF \
        .groupBy('repo_id', 'repo_full_name', 'repo_language') \
        .agg(F.max('stargazers_count').alias('stargazers_count'))

    recommendedItemsDF = userPredictedDF \
        .join(repoDF, userPredictedDF['item'] == repoDF['repo_id'], 'inner') \
        .select('prediction', 'repo_full_name', 'repo_language', 'stargazers_count') \
        .orderBy('prediction', ascending=False)

    return recommendedItemsDF

k = 30
username = 'vinta'
recommendedItemsDF = recommendItems(rawDF, alsModel, username, topN=k, excludeKnownItems=False)
for item in recommendedItemsDF.collect():
    repoName = item['repo_full_name']
    repoUrl = 'https://github.com/{0}'.format(repoName)
    print(repoUrl, item['prediction'], item['repo_language'], item['stargazers_count'])

ref:
https://github.com/vinta/albedo/blob/master/src/main/python/train_als.ipynb

Cross-validate Models

使用 Spark ML 的 pipeline 來做 cross-validation,選出最適合的 hyperparameters 組合。

  • rank: The number of latent factors in the model, or equivalently, the number of columns k in the user-feature and product-feature matrices.
  • regParam: A standard overfitting parameter, also usually called lambda. Higher values resist overfitting, but values that are too high hurt the factorization’s accuracy.
  • alpha: Controls the relative weight of observed versus unobserved user-product interactions in the factorization.
  • maxIter: The number of iterations that the factorization runs. More iterations take more time but may produce a better factorization.
dataCleaner = DataCleaner()

als = ALS(implicitPrefs=True, seed=42)

predictionProcessor = PredictionProcessor()

pipeline = Pipeline(stages=[
    dataCleaner,
    als,
    predictionProcessor,
])

paramGrid = ParamGridBuilder() \
    .addGrid(dataCleaner.minItemStargazersCount, [1, 10, 100]) \
    .addGrid(dataCleaner.maxItemStargazersCount, [4000, ]) \
    .addGrid(dataCleaner.minUserStarredCount, [1, 10, 100]) \
    .addGrid(dataCleaner.maxUserStarredCount, [1000, 4000, ]) \
    .addGrid(als.rank, [50, 100]) \
    .addGrid(als.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(als.alpha, [0.01, 0.89, 1, 40, ]) \
    .addGrid(als.maxIter, [22, ]) \
    .build()

rankingEvaluator = RankingEvaluator(k=30)

cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=rankingEvaluator,
                    numFolds=2)

cvModel = cv.fit(ratingDF)

def printCrossValidationParameters(cvModel):
    metric_params_pairs = list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))
    metric_params_pairs.sort(key=lambda x: x[0], reverse=True)
    for pair in metric_params_pairs:
        metric, params = pair
        print('metric', metric)
        for k, v in params.items():
            print(k.name, v)
        print('')

printCrossValidationParameters(cvModel)

ref:
https://spark.apache.org/docs/latest/ml-pipeline.html

Generate negative samples for recommender system?

Generate negative samples for recommender system?

根據「推荐系统实践」,挑選負樣本時應該遵循以下原則:

  • 对每个用户,要保证正负样本的平衡(数目相似)。
  • 对每个用户采样负样本时,要选取那些很热门,而用户却没有行为的物品。
  • 一般认为,很热门而用户却没有行为更加代表用户对这个物品不感兴趣。因为对于冷门的物品,用户可能是压根没在网站中发现这个物品,所以谈不上是否感兴趣。

ref:
http://www.duokan.com/reader/www/app.html?id=ed873c9e323511e28a9300163e0123ac

不過如果你是用 Spark ML 的 ALS(implicitPrefs=True) 的話,並不需要手動加入負樣本。對 implicit feedback 的 ALS 來說,手動加入負樣本(Rui = 0 的樣本)是沒有意義的,因為 missing value / non-observed value 對該演算法來說本來就是 0,表示用戶確實沒有對該物品做出行為,也就是 Pui = 0 沒有偏好,所以 Cui = 1 + alpha x 0 置信度也會比其他正樣本低。不過因為 Spark ML 的 ALS 只會計算 Rui > 0 的項目,所以即便你手動加入了 Rui = 0 或 Rui = -1 的負樣本,對整個模型其實沒有影響。

用以下這三組 dataset 訓練出來的模型都是一樣的:

from pyspark.ml.recommendation import ALS

matrix = [
    (1, 1, 0),
    (1, 2, 1),
    (1, 3, 0),
    (1, 4, 1),
    (1, 5, 1),
    (2, 1, 1),
    (2, 2, 1),
    (2, 3, 0),
    (2, 4, 1),
    (2, 5, 1),
    (3, 1, 1),
    (3, 2, 1),
    (3, 3, 1),
    (3, 4, 1),
    (3, 5, 0),
]
df0 = spark.createDataFrame(matrix, ['user', 'item', 'rating'])

matrix = [
    (1, 1, -1),
    (1, 2, 1),
    (1, 3, -1),
    (1, 4, 1),
    (1, 5, 1),
    (2, 1, 1),
    (2, 2, 1),
    (2, 3, -1),
    (2, 4, 1),
    (2, 5, 1),
    (3, 1, 1),
    (3, 2, 1),
    (3, 3, 1),
    (3, 4, 1),
    (3, 5, -1),
]
df1 = spark.createDataFrame(matrix, ['user', 'item', 'rating'])

matrix = [
    (1, 2, 1),
    (1, 4, 1),
    (1, 5, 1),
    (2, 1, 1),
    (2, 2, 1),
    (2, 4, 1),
    (2, 5, 1),
    (3, 1, 1),
    (3, 2, 1),
    (3, 3, 1),
    (3, 4, 1),
]
df2 = spark.createDataFrame(matrix, ['user', 'item', 'rating'])

als = ALS(implicitPrefs=True, seed=42, nonnegative=False).setRank(7).setMaxIter(15).setRegParam(0.01).setAlpha(40)
alsModel = als.fit(df0)
alsModel.userFactors.select('features').show(truncate=False)
alsModel.itemFactors.select('features').show(truncate=False)

als = ALS(implicitPrefs=True, seed=42, nonnegative=False).setRank(7).setMaxIter(15).setRegParam(0.01).setAlpha(40)
alsModel = als.fit(df1)
alsModel.userFactors.select('features').show(truncate=False)
alsModel.itemFactors.select('features').show(truncate=False)

als = ALS(implicitPrefs=True, seed=42, nonnegative=False).setRank(7).setMaxIter(15).setRegParam(0.01).setAlpha(40)
alsModel = als.fit(df2)
alsModel.userFactors.select('features').show(truncate=False)
alsModel.itemFactors.select('features').show(truncate=False)

ref:
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L1626
https://github.com/apache/spark/commit/b05b3fd4bacff1d8b1edf4c710e7965abd2017a7
https://www.mail-archive.com/[email protected]/msg60240.html
http://apache-spark-user-list.1001560.n3.nabble.com/implicit-ALS-dataSet-td7067.html

Spark ML cookbook (Python)

Spark ML cookbook (Python)

Calculate percentage of sparsity of a user-item rating DataFrame

result = ratingDF.agg(F.count('rating'), F.countDistinct('user'), F.countDistinct('item')).collect()[0]
totalUserCount = result['count(DISTINCT user)']
totalItemCount = result['count(DISTINCT item)']
zonZeroRatingCount = result['count(rating)']

density = (zonZeroRatingCount / (totalUserCount * totalItemCount)) * 100
sparsity = 100 - density

Recommend items for single user

topN = 30
userID = 123

userItemsDF = alsModel \
    .itemFactors. \
    selectExpr('{0} AS user'.format(userID), 'id AS item')
userPredictedDF = alsModel \
    .transform(userItemsDF) \
    .select('item', 'prediction') \
    .orderBy('prediction', ascending=False) \
    .limit(topN)

ref:
https://www.safaribooksonline.com/library/view/advanced-analytics-with/9781491972946/ch03.html
https://www.safaribooksonline.com/library/view/building-a-recommendation/9781785282584/ch06s04.html
https://www.safaribooksonline.com/library/view/building-recommendation-engines/9781785884856/ch07s05.html

Recommend items for every user (a slightly fast way)

import numpy
from numpy import *

myModel = MatrixFactorizationModel.load(sc, 'BingBong')
m1 = myModel.productFeatures()
m2 = m1.map(lambda (product, feature): feature).collect()
m3 = matrix(m2).transpose()
pf = sc.broadcast(m3)
uf = myModel.userFeatures().coalesce(100)

# get predictions on all user
f1 = uf.map(lambda (userID, features): (userID, squeeze(asarray(matrix(array(features)) * pf.value))))

ref:
https://www.slideshare.net/SparkSummit/26-trillion-app-recomendations-using-100-lines-of-spark-code-ayman-farahat

Evaluate a binary classification

from pyspark.ml.evaluation import BinaryClassificationEvaluator

matrix = [
    (0.5, 1),
    (2.0, 1),
    (0.8, 1),
    (0.2, 0),
    (0.1, 0),
    (0.4, 0),
]
predictions = spark.createDataFrame(matrix, ['prediction', 'label'])
predictions = predictions.withColumn('prediction', predictions['prediction'].cast('double'))

evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
evaluator.evaluate(predictions)

Calculate ranking metrics

from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window
from pyspark.sql.functions import col, expr
import pyspark.sql.functions as F

k = 10

windowSpec = Window.partitionBy('user').orderBy(col('prediction').desc())
perUserPredictedItemsDF = outputDF \
    .select('user', 'item', 'prediction', F.rank().over(windowSpec).alias('rank')) \
    .where('rank <= {0}'.format(k)) \
    .groupBy('user') \
    .agg(expr('collect_list(item) as items'))
perUserPredictedItemsDF.show()
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[36560369, 197450...|
# |   47217|[40693501, 643554...|
# +--------+--------------------+

windowSpec = Window.partitionBy('from_user_id').orderBy(col('starred_at').desc())
perUserActualItemsDF = rawDF \
    .select('from_user_id', 'repo_id', 'starred_at', F.rank().over(windowSpec).alias('rank')) \
    .where('rank <= {0}'.format(k)) \
    .groupBy('from_user_id') \
    .agg(expr('collect_list(repo_id) as items')) \
    .withColumnRenamed('from_user_id', 'user')
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[29122050, 634846...|
# |   59990|[9820191, 8729416...|
# +--------+--------------------+

perUserItemsRDD = perUserPredictedItemsDF.join(perUserActualItemsDF, 'user') \
    .rdd \
    .map(lambda row: (row[1], row[2]))
rankingMetrics = RankingMetrics(perUserItemsRDD)

print(rankingMetrics.meanAveragePrecision)
print(rankingMetrics.precisionAt(k))
print(rankingMetrics.ndcgAt(k))

ref:
https://www.safaribooksonline.com/library/view/spark-the-definitive/9781491912201/ch19.html

Create a custom Transformer

from pyspark.ml import Transformer

class PredictionProcessor(Transformer):

    def _transform(self, predictedDF):
        nonNullDF = predictedDF.dropna(subset=['prediction', ])
        predictionDF = nonNullDF.withColumn('prediction', nonNullDF['prediction'].cast('double'))
        return predictionDF

ref:
http://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml

RankingMetrics

結果介於 0 ~ 1 之間,分數越大越好。

from pyspark.mllib.evaluation import RankingMetrics

predictionAndLabels = sc.parallelize([
    ([1, 2, 3, 4], [1, 2, 3, 4]),
    ([1, ], [1, 10]),
    ([6, 4, 2], [6, 2, 100, 8, 2, 55]),
])
metrics = RankingMetrics(predictionAndLabels)
metrics.ndcgAt(5)

ref:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems

Create a custom Evaluator

from pyspark import keyword_only
from pyspark.ml.evaluation import Evaluator
from pyspark.ml.param.shared import Param
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
import pyspark.sql.functions as F

class RankingEvaluator(Evaluator):

    @keyword_only
    def __init__(self, k=None):
        super(RankingEvaluator, self).__init__()
        self.k = Param(self, 'k', 'Top K')
        self._setDefault(k=30)
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, k=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def isLargerBetter(self):
        return True

    def setK(self, value):
        self._paramMap[self.k] = value
        return self

    def getK(self):
        return self.getOrDefault(self.k)

    def _evaluate(self, outputDF):
        k = self.getK()

        windowSpec = Window.partitionBy('user').orderBy(col('prediction').desc())
        perUserPredictedItemsDF = outputDF \
            .select('user', 'item', 'prediction', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        windowSpec = Window.partitionBy('user').orderBy(col('starred_at').desc())
        perUserActualItemsDF = outputDF \
            .select('user', 'item', 'starred_at', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        perUserItemsRDD = perUserPredictedItemsDF.join(F.broadcast(perUserActualItemsDF), 'user', 'inner') \
            .rdd \
            .map(lambda row: (row[1], row[2]))
        rankingMetrics = RankingMetrics(perUserItemsRDD)
        metric = rankingMetrics.ndcgAt(k)
        return metric

Show best parameters from a CrossValidatorModel

metric_params_pairs = list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))
metric_params_pairs.sort(key=lambda x: x[0], reverse=True)
best_metric_params = metric_params_pairs[0][1]
for pair in metric_params_pairs:
    metric, params = pair
    print('metric', metric)
    for k, v in params.items():
        print(k.name, v)
    print('')
# metric 0.5273636632856705
# rank 50
# regParam 0.01
# maxIter 10
# alpha 1

ref:
https://stackoverflow.com/questions/31749593/how-to-extract-best-parameters-from-a-crossvalidatormodel
https://stackoverflow.com/questions/39529012/pyspark-get-all-parameters-of-models-created-with-paramgridbuilder

Show best parameters from a TrainValidationSplitModel

metric_params_pairs = list(zip(tvModel.validationMetrics, tvModel.getEstimatorParamMaps()))
metric_params_pairs.sort(key=lambda x: x[0], reverse=True)
for pair in metric_params_pairs:
    metric, params = pair
    print('metric', metric)
    for k, v in params.items():
        print(k.name, v)
    print('')
# metric 0.5385481418189484
# rank 50
# regParam 0.1
# maxIter 20
# alpha 1