Setup Spark, Scala and Maven with Intellij IDEA

Setup Spark, Scala and Maven with Intellij IDEA

IntelliJ IDEA supports Scala and Apache Spark perfectly. You're able to browse a complete Spark project built with IntelliJ IDEA on GitHub: https://github.com/vinta/albedo

Useful Plugins:

Initiate a Maven Project

$ mvn archetype:generate
Choose a number: xxx
xxx: remote -> net.alchim31.maven:scala-archetype-simple

ref:
https://docs.scala-lang.org/tutorials/scala-with-maven.html

Example Configurations

The remaining section of this article assumes that you use this pom.xml which should be able to work out of the box.

<!-- in pom.xml -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>ws.vinta</groupId>
  <artifactId>albedo</artifactId>
  <version>1.0.0-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>${project.artifactId}</name>
  <description>A recommender system for discovering GitHub repos</description>
  <url>https://github.com/vinta/albedo</url>
  <inceptionYear>2017</inceptionYear>
  <properties>
    <java.version>1.8</java.version>
    <scala.version>2.11.8</scala.version>
    <scala.compactVersion>2.11</scala.compactVersion>
    <spark.version>2.2.0</spark.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  <repositories>
    <repository>
      <id>spark-packages</id>
      <name>Spark Packages Repository</name>
      <url>https://dl.bintray.com/spark-packages/maven/</url>
    </repository>
  </repositories>
  <dependencies>
    <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_${scala.compactVersion}</artifactId>
      <version>${spark.version}</version>
      <scope>compile</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11 -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_${scala.compactVersion}</artifactId>
      <version>${spark.version}</version>
      <scope>compile</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib_2.11 -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_${scala.compactVersion}</artifactId>
      <version>${spark.version}</version>
      <scope>compile</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.42</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpclient</artifactId>
      <version>4.5.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-high-level-client -->
    <dependency>
      <groupId>org.elasticsearch.client</groupId>
      <artifactId>elasticsearch-rest-high-level-client</artifactId>
      <version>5.6.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.hankcs/hanlp -->
    <dependency>
      <groupId>com.hankcs</groupId>
      <artifactId>hanlp</artifactId>
      <version>portable-1.3.4</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.github.rholder/snowball-stemmer -->
    <dependency>
      <groupId>com.github.rholder</groupId>
      <artifactId>snowball-stemmer</artifactId>
      <version>1.3.0.581.1</version>
    </dependency>
  </dependencies>
  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.6.1</version>
        <configuration>
          <source>${java.version}</source>
          <target>${java.version}</target>
          <encoding>UTF-8</encoding>
        </configuration>
      </plugin>
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.1</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-install-plugin</artifactId>
        <version>2.5.2</version>
        <configuration>
          <skip>true</skip>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.0</version>
        <executions>
          <execution>
           <phase>install</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                    <exclude>META-INF/*.SF</exclude>
                  </excludes>
                </filter>
              </filters>
              <artifactSet>
                <excludes>
                  <exclude>com.apple:AppleJavaExtensions:*</exclude>
                  <exclude>javax.servlet:*</exclude>
                  <exclude>org.apache.hadoop:*</exclude>
                  <exclude>org.apache.maven.plugins:*</exclude>
                  <exclude>org.apache.parquet:*</exclude>
                  <exclude>org.apache.spark:*</exclude>
                  <exclude>org.scala-lang:*</exclude>
                </excludes>
              </artifactSet>
              <finalName>${project.artifactId}-${project.version}-uber</finalName>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

ref:
https://davidb.github.io/scala-maven-plugin/example_compile.html

Generate a Thin JAR

Thin JAR only contains classes that you created, which means you should include your dependencies externally.

$ mvn clean package -DskipTests

You're able to specify different classes in the same JAR.

$ spark-submit \
--master spark://localhost:7077 \
--packages "mysql:mysql-connector-java:5.1.41" \
--class ws.vinta.albedo.LogisticRegressionRanker \
target/albedo-1.0.0-SNAPSHOT.jar

ref:
https://stackoverflow.com/questions/1082580/how-to-build-jars-from-intellij-properly
https://spark.apache.org/docs/latest/submitting-applications.html

Generate a Fat JAR, Shaded JAR or Uber JAR

CAUTION: DO NOT ENABLE <minimizeJar>true</minimizeJar> in the maven-shade-plugin, it will ruin your day!

<!-- in pom.xml -->
<project>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-install-plugin</artifactId>
        <version>2.5.2</version>
        <configuration>
          <skip>true</skip>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.0</version>
        <executions>
          <execution>
            <phase>install</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <artifactSet>
                <excludes>
                  <exclude>com.apple:AppleJavaExtensions:*</exclude>
                  <exclude>javax.servlet:*</exclude>
                  <exclude>org.apache.hadoop:*</exclude>
                  <exclude>org.apache.maven.plugins:*</exclude>
                  <exclude>org.apache.parquet:*</exclude>
                  <exclude>org.apache.spark:*</exclude>
                  <exclude>org.scala-lang:*</exclude>
                </excludes>
              </artifactSet>
              <finalName>${project.artifactId}-${project.version}-uber</finalName>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>
# the output jar will be located in "target/albedo-1.0.0-SNAPSHOT-uber.jar"
$ mvn clean install -DskipTests
$ spark-submit \
--master spark://localhost:7077 \
--class ws.vinta.albedo.LogisticRegressionRanker \
target/albedo-1.0.0-SNAPSHOT-uber.jar

ref:
http://maven.apache.org/plugins/maven-shade-plugin/examples/includes-excludes.html
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/missing_dependencies_in_jar_files.html

Run a Spark Application in Local Mode

Follow "Run > Edit Configurations":

  • VM options: -Xms12g -Xmx12g -Dspark.master="local[*]"
  • Before launch:
    • Build

Or

// in YourSparkApp.scala
package ws.vinta.albedo

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object YourSparkApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .set("spark.driver.memory", "12g")

    val spark = SparkSession
      .builder()
      .appName("YourSparkApp")
      .config(conf)
      .getOrCreate()

    spark.stop()
  }
}

ref:
https://github.com/jaceklaskowski/mastering-apache-spark-book/blob/master/spark-sparkcontext-creating-instance-internals.adoc
https://stackoverflow.com/questions/43054268/how-to-set-spark-memorystore-size-when-running-in-intellij-scala-console

Run a Spark Application in Standalone Mode

First, start your Spark Standalone cluster:

$ cd ${SPARK_HOME}
$ ./sbin/start-master.sh -h 0.0.0.0
$ ./sbin/start-slave.sh spark://localhost:7077

# print logs from Spark master and workers, useful for debuging
$ tail -f ${SPARK_HOME}/logs/*

Follow "Run > Edit Configurations":

  • VM options: -Dspark.master=spark://localhost:7077 -Dspark.driver.memory=2g -Dspark.executor.memory=12g -Dspark.executor.cores=3
    • Local cluster mode: -Dspark.master="local-cluster[x, y, z]" with x workers, y cores per worker, and z MB memory per worker
    • Local cluster mode doesn't need a real Spark Standalone cluster
  • Before launch:
    • Build
    • Run Maven Goal 'albedo: clean install'

Or

// in YourSparkApp.scala
package ws.vinta.albedo

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object YourSparkApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("spark://localhost:7077")
      .set("spark.driver.memory", "2g")
      .set("spark.executor.memory", "12g")
      .set("spark.executor.cores", "3")
      .setJars(List("target/albedo-1.0.0-SNAPSHOT-uber.jar"))
      // or
      .setMaster("local-cluster[1, 3, 12288]")
      .setJars(List("target/albedo-1.0.0-SNAPSHOT-uber.jar"))

    val spark = SparkSession
      .builder()
      .appName("YourSparkApp")
      .config(conf)
      .getOrCreate()

    spark.stop()
  }
}

In the end, there are some glossaries which need to be clarified:

  • compile: compile a single .java or .scala into *.class
  • make: compile changed files only
  • build: compile every files in the project

ref:
http://www.jianshu.com/p/b4e4658c459c

Specify a Custom Logging Configuration

$ cd PROJECT_ROOT
$ cp $SPARK_HOME/conf/log4j.properties.template log4j.properties

Follow "Run > Edit Configurations":

  • VM options: -Dlog4j.configuration=file:./log4j.properties

ref:
https://spark.apache.org/docs/latest/sql-programming-guide.html#getting-started
https://stackoverflow.com/questions/43054268/how-to-set-spark-memorystore-size-when-running-in-intellij-scala-console
https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Config-log4j-in-Spark/td-p/34968

Feature Engineering 特徵工程中常見的方法

Feature Engineering 特徵工程中常見的方法

Feature Engineering 是把 raw data 轉換成 features 的整個過程的總稱。基本上特徵工程就是個手藝活,講求的是創造力。

本文不定期更新中。

Missing Value Imputation

最簡單暴力的做法當然就是直接 drop 掉那些含有缺失值的 rows。

針對 numerical 特徵的缺失值,可以用以下方式取代:

  • 0,缺點是可能會混淆其他本來就是 0 的數值
  • -999,用某個正常情況下不會出現的數值代替,但是選得不好可能會變成異常值,要特別對待
  • Mean,平均數
  • Median,中位數,跟平均數相比,不會被異常值干擾

針對 categorical 特徵的缺失值,可以用以下方式取代:

  • Mode,眾數,最常見的值
  • 改成 "Others" 之類的值

假設你要填補 age 這個特徵,然後你有其他例如 gender 這樣的特徵,你可以分別計算男性和女性的 age 的 mean、median 和 mode 來填補缺失值;更複雜一點的方式是,你可以把沒有缺失值的數據挑出來,用它們來訓練一個 regression 或 classification 模型,用這個模型來預測缺失值。

不過其實有些演算法是可以容許缺失值的,這時候可以新增一個 has_missing_value 欄位(稱為 NA indicator column)。

ref:
http://adataanalyst.com/machine-learning/comprehensive-guide-feature-engineering/
https://stats.stackexchange.com/questions/28860/why-adding-an-na-indicator-column-instead-of-value-imputation-for-randomforest

Outliers Detection

發現離群值最直觀的方式就是畫圖表,針對單一特徵可以使用 box plot;兩兩特徵則可以使用 scatter plot。

處置離群值的方式通常是直接刪除或是做變換(例如 log transformation 或 binning),當然你也可以套用處理缺失值的方式。

ref:
https://www.analyticsvidhya.com/blog/2016/01/guide-data-exploration/
https://www.douban.com/note/413022836/

Duplicate Entries Removal

duplicate 或 redundant 尤其指的是那些 features 都一樣,但是 target variable 卻不同的數據。

Feature Scaling 特徵縮放

Standardization 標準化

原始資料中,因為各個特徵的含義和單位不同,每個特徵的取值範圍可能會差異很大。例如某個二元特徵的範圍是 0 或 1,另一個價格特徵的範圍可能是 [0, 1000000],由於取值範圍相差過大導致了模型可能會更偏向於取值範圍較大的那個特徵。解決的辦法就是把各種不同 scale 的特徵轉換成同樣的 scale,稱為標準化或正規化。

狹義來說,標準化專門指的是透過計算 z-score,讓數據的 mean 為 0、 variance 為 1。

ref:
https://spark.apache.org/docs/latest/ml-features.html#standardscaler
http://scikit-learn.org/stable/modules/preprocessing.html#standardization-or-mean-removal-and-variance-scaling
https://www.quora.com/How-bad-is-it-to-standardize-dummy-variables

Normalization 歸一化、正規化

歸一化是指把每個樣本縮放到單位範數(每個樣本的範數為 1),適用於計算 dot product 或者兩個樣本之間的相似性。除了標準化、歸一化之外,其他還有透過最大、最小值,把數據的範圍縮放到 [0, 1] 或 [-1, 1] 的區間縮放法,不過這個方法容易受異常值的影響。

標準化是分別對單一特徵進行(針對 column);歸一化是對每個 observation 進行(針對 row)。

對 SVM、logistic regression 或其他使用 squared loss function 的演算法來說,需要 standardization;對 Vector Space Model 來說,需要 normalization;至於 tree-based 的演算法,基本上都不需要標準化或歸一化,它們對 scale 不敏感。

ref:
https://spark.apache.org/docs/latest/ml-features.html#normalizer
http://scikit-learn.org/stable/modules/preprocessing.html#normalization
https://www.qcloud.com/community/article/689521

Feature Transformation 特徵變換

以下適用 continuous 特徵:

Rounding

某些精度有到小數點後第 n 位的特徵,如果你其實不需要那麼精確,可以考慮 round(value * m)round(log(value)) 這樣的做法,甚至可以把 round 之後的數值當成 categorical 特徵。

confidence  round(confidence * 10)
0.9594      10
0.1254      1
0.1854      2
0.5454      5
0.3655      4

Log Transformation

因為 x 越大,log(x) 增長的速度就越慢,所以取 log 的意義是可以 compress 大數和 expand 小數,換句話說就是壓縮 "long tail" 和展開 "head"。假設 x 原本的範圍是 [100, 1000],log(x, 10) 之後的範圍就變成 [2, 3] 了。也常常使用 log(1 + x)log(x / (1 - x))

另外一種類似的做法是 square root 平方根或 cube root 立方根(可以用在負數)。

ref:
https://www.safaribooksonline.com/library/view/mastering-feature-engineering/9781491953235/ch02.html

Binarization 二值化

對數值型的數據設定一個 threshold,大於就賦值為 1、小於就賦值為 0。例如 score,如果你只關心「及格」或「不及格」,可以直接把成績對應到 1(score >= 60)和 0(score < 60)。或是你要做啤酒銷量分析,你可以新增一個 age >= 18 的特徵來標示出已成年。

你有一個 color 的 categorical 特徵,如果你不在乎實際上是什麼顏色的話,其實也可以改成 has_color

ref:
https://spark.apache.org/docs/latest/ml-features.html#binarizer

Binning

也稱為 bucketization。

age 這樣的特徵為例,你可以把所有年齡拆分成 n 段,0-20 歲、20-40 歲、40-60 歲等或是 0-18 歲、18-40 歲、40-70 歲等(等距或等量),然後把個別的年齡對應到某一段,假設 26 歲是對應到第二個 bucket,那新特徵的值就是 2。這種方式是人為地指定每個 bucket 的邊界值,還有另外一種拆分法是根據數據的分佈來拆,稱為 quantization 或 quantile binning,你只需要指定 bucket 的數量即可。

同樣的概念應用到其他地方,可以把 datetime 特徵拆分成上午、中午、下午和晚上;如果是 categorical 特徵,則可以先 SELECT count() ... GROUP BY,然後把出現次數小於某個 threshold 的值改成 "Other" 之類的。或者是你有一個 occupation 特徵,如果你其實不需要非常準確的職業資訊的話,可以把 "Web Developer"、"iOS Developer" 或 "DBA" 這些個別的資料都改成 "Software Engineer"。

binarization 和 binning 都是對 continuous 特徵做 discretization 離散化,增強模型的非線性泛化能力。

ref:
https://spark.apache.org/docs/latest/ml-features.html#bucketizer
https://spark.apache.org/docs/latest/ml-features.html#quantilediscretizer
https://github.com/collectivemedia/spark-ext#optimal-binning
https://www.qcloud.com/community/article/689521

以下適用 categorical 特徵:

ref:
https://en.wikipedia.org/wiki/Categorical_variable
https://www.safaribooksonline.com/library/view/introduction-to-machine/9781449369880/ch04.html

Integer Encoding

也稱為 label encoding。

把每個 category 對應到數字,一種做法是隨機對應到 0, 1, 2, 3, 4 等數字;另外一種做法是依照該值出現的頻率大小的順序來給值,例如最常出現的值給 0,依序給 1, 2, 3 等等。如果是針對一些在某種程度上有次序的 categorical 特徵(稱為 ordinal),例如「鑽石會員」「白金會員」「黃金會員」「普通會員」,直接 mapping 成數字可能沒什麼問題,但是如果是類似 colorcity 這樣的沒有明顯大小的特徵的話,還是用 one-hot encoding 比較合適。不過如果用的是 tree-based 的演算法就無所謂了。

有些 categorical 特徵也可能會用數字表示(例如 id),跟 continuous 特徵的差別是,數值的差異或大小對 categorical 特徵來說沒有太大的意義。

ref:
http://breezedeus.github.io/2014/11/15/breezedeus-feature-processing.html
http://phunters.lofter.com/post/86d56_194e956

One-hot Encoding (OHE)

如果某個特徵有 m 種值(例如 Taipei, Beijing, Tokyo),那它 one-hot encode 之後就會變成長度為 m 的向量:

city    city_Taipei city_Beijing city_tokyo
Taipei  1           0            0
Beijing 0           1            0
Tokyo   0           0            1

你也可以改用 Dummy coding,這樣就只需要產生長度為 m -1 的向量:

city    city_Taipei city_Beijing
Taipei  1           0
Beijing 0           1
Tokyo   0           0

OHE 的缺點是容易造成特徵的維度大幅增加和沒辦法處理之前沒見過的值。

ref:
http://scikit-learn.org/stable/modules/preprocessing.html#preprocessing-categorical-features
https://blog.myyellowroad.com/using-categorical-data-in-machine-learning-with-python-from-dummy-variables-to-deep-category-66041f734512

Bin-counting

例如在 Computational Advertising 中,如果你有針對每個 user 的「廣告曝光數(包含點擊和未點擊)」和「廣告點擊數」,你就可以算出每個 user 的「點擊率」,然後用這個機率來表示每個 user,反之也可以對 ad id 使用類似的做法。

ad_id   ad_views  ad_clicks  ad_ctr
412533  18339     1355       0.074
423334  335       12         0.036
345664  1244      132        0.106
349833  35387     1244       0.035

ref:
https://blogs.technet.microsoft.com/machinelearning/2015/02/17/big-learning-made-easy-with-counts/

換個思路,如果你有一個 brand 的特徵,然後你可以從 user 的購買記錄中找出購買 A 品牌的人,有 70% 的人會購買 B 品牌、有 40% 的人會購買 C 品牌;購買 D 品牌的人,有 10% 的人會購買 A 品牌和 E 品牌,你可以每個品牌表示成這樣:

brand  A    B    C    D    E
A      1.0  0.7  0.4  0.0  0.0
B      ...
C      ...
D      0.1  0.0  0.0  1.0  0.1
E      ...

ref:
http://phunters.lofter.com/post/86d56_194e956

LabelCount Encoding

類似 Bin-cunting 的做法,一樣是利用現有的 count 或其他統計上的資料,差別在於 LabelCount Encoding 最後用的是次序而不是數值本身。優點是對異常值不敏感。

ad_id   ad_clicks  ad_rank
412533  1355       1
423334  12         4
345664  132        3
349833  1244       2

ref:
https://www.slideshare.net/gabrielspmoreira/feature-engineering-getting-most-out-of-data-for-predictive-models-tdc-2017/47

Count Vectorization

除了可以用在 text 特徵之外,如果你有 comma-seperated 的 categorical 特徵也可以使用這個方法。例如電影類型 genre,裡頭的值長這樣 Action,Sci-Fi,Drama,就可以先用 RegexTokenizer 轉成 Array("action", "sci-fi", "drama"),再用 CountVectorizer 轉成 vector。

ref:
https://spark.apache.org/docs/latest/ml-features.html#countvectorizer

Feature Hashing

以 user id 為例,透過一個 hash function 把每一個 user id 映射到 (hashed1_, hashed_2, ..., hashed_m) 的某個值。指定 m << user id 的取值範圍,所以缺點是會有 collision(如果你的 model 足夠 robust,倒也是可以不管),優點是可以良好地處理之前沒見過的值和罕見的值。當然不只可以 hash 單一值,也可以 hash 一個 vector。

你可以把 feature hashing 表示為單一欄位的數值(例如 2)或是類似 one-hot encoding 那樣的多欄位的 binary 表示法(例如 [0, 0, 1])。

import hashlib

def hash_func(s, n_bins=100000):
    s = s.encode('utf-8')
    return int(hashlib.md5(s).hexdigest(), 16) % (n_bins - 1) + 1

print(hash_func('some categorical value'))

ref:
https://github.com/apache/spark/pull/18513
https://spark.apache.org/docs/latest/ml-features.html#feature-transformation
https://www.slideshare.net/gabrielspmoreira/feature-engineering-getting-most-out-of-data-for-predictive-models-tdc-2017/42

Mean Encoding

ref:
https://zhuanlan.zhihu.com/p/26308272

Category Embedding

ref:
https://arxiv.org/abs/1604.06737
https://www.slideshare.net/HJvanVeen/feature-engineering-72376750/17
https://blog.myyellowroad.com/using-categorical-data-in-machine-learning-with-python-from-dummy-variables-to-deep-category-42fd0a43b009

User Profile 用戶畫像

使用用戶畫像來表示每個 user id,例如用戶的年齡、性別、職業、收入、居住地、偏好的各種 tag 等,把每個 user 表示成一個 feature vector。除了單一維度的特徵之外,也可以建立「用戶聽過的歌都是哪些曲風」、「用戶(30 天內)瀏覽過的文章都是什麼分類,以 TF-IDF 的方式表達。或者是把用戶所有喜歡文章對應的向量的平均值作為此用戶的 profile。比如某個用戶經常關注與推薦系統有關的文章,那麼他的 profile 中 "CB"、"CF" 和 "推薦系統" 對應的權重值就會較高。

ref:
https://mp.weixin.qq.com/s/w87-dyG9Ap9xJ_HZu0Qn-w
https://medium.com/unstructured/how-feature-engineering-can-help-you-do-well-in-a-kaggle-competition-part-i-9cc9a883514d

Rare Categorical Variables

先計算好每一種 category 的數量,然後把小於某個 threshold 的 category 都改成 "Others" 之類的值。或是使用 clustering 演算法來達到同樣的目的。你也可以直接建立一個新的 binary feature 叫做 rare,要來標示那些相對少見的資料點。

Unseen Categorical Variables

以 Spark ML 為例,當你用 training set 的資料 fit 了一個 StringIndexer(和 OneHotEncoder),把它拿去用在 test set 上時,有一定的機率你會遇到某些 categorical 特徵的值只在 test set 出現,所以對只見過 training set 的 transformer 來說,這些就是所謂的 unseen values。

對付 unseen values 通常有幾種做法:

  • 用整個 training set + test set 來編碼 categorical 特徵
  • 直接捨棄含有 unseen values 的那筆資料
  • 把 unseen values 改成 "Others" 之類的已知值。StringIndexer.setHandleInvalid("keep") 基本上就是這種做法

如果採用第一種方式,一但你把這個 transformer 拿到 production 去用時,無可避免地還是會遇到 unseen values。不過通常線上的 feature engineering 會有別的做法,例如事先把 user 或 item 的各項特徵都算好(定期更新或是 data 產生的時候觸發),然後以 id 為 key 存進 Redis 之類的 NoSQL 裡,model 要用的時候直接用 user id / item id 拿到處理好的 feature vector。

ref:
https://stackoverflow.com/questions/34681534/spark-ml-stringindexer-handling-unseen-labels

Large Categorical Variables

針對那種非常大的 categorical 特徵(例如 id 類的特徵),如果你用的是 logistic regression,其實可以硬上 one-hot encoding。不然就是利用上面提到的 feature hashing 或 bin counting 等方式;如果是 GBDT 的話,甚至可以直接用 id 硬上,只要 tree 足夠多。

ref:
https://www.zhihu.com/question/34819617

Feature Construction 特徵建構

特徵構建指的是從原有的特徵中,人工地創造出新的特徵,通常用來解決一般的線性模型沒辦法學到非線性特徵的問題。其中一個重點可能是能不能夠過某些辦法,在特徵中加入某些「額外的資訊」,雖然也得小心數據偏見的問題。

如果你有很多 user 購物的資料,除了可以 aggregate 得到 total spend 這樣的 feature 之外,也可以變換一下,變成 spend in last weekspend in last monthspend in last year 這種可以表示「趨勢」的特徵。

範例:

  • author_avg_page_view: 該文章作者的所有文章的平均瀏覽數
  • user_visited_days_since_doc_published: 該文章發布到該用戶訪問經過了多少天
  • user_history_doc_sim_categories: 用戶讀過的所有文章的分類和該篇文章的分類的 TF-IDF 的相似度
  • user_history_doc_sim_topics: 用戶讀過的所有文章的內文和該篇文章的內文的 TF-IDF 的相似度

ref:
https://medium.com/unstructured/how-feature-engineering-can-help-you-do-well-in-a-kaggle-competition-part-i-9cc9a883514d
https://www.safaribooksonline.com/library/view/large-scale-machine/9781785888748/ch04s02.html
https://www.slideshare.net/HJvanVeen/feature-engineering-72376750/23

Temporal Features 時間特徵

對於 date / time 類型的資料,除了轉換成 timestamp 和取出 day、month 和 year 做成新的欄位之外,也可以對 hour 做 binning(分成上午、中午、晚上之類的)或是對 day 做 binning(分成工作日、週末);或是想辦法查出該日期當天的天氣、節日或活動等訊息,例如 is_national_holidayhas_sport_events

更進一步,用 datetime 類的資料通常也可以做成 spend_hours_last_weekspend_money_last_week 這種可以用來表示「趨勢」的特徵。

Text Features 文字特徵

ref:
https://www.slideshare.net/HJvanVeen/feature-engineering-72376750/57

Spatial Features 地理特徵

如果你有 cityaddress 等特徵,可以新建出 latitudelongitude 兩個 features(當然你得透過外部的 API 或資料來源才做得到),再組合出 median_income_within_2_miles 這樣的特徵。

ref:
https://www.slideshare.net/HJvanVeen/feature-engineering-72376750/47

Cyclical Features

ref:
http://blog.davidkaleko.com/feature-engineering-cyclical-features.html

Features Interaction 特徵交互

假設你有 AB 兩個 continuous 特徵,你可以用 A + BA - BA * BA / B 之類的方式建立新的特徵。例如 house_age_at_purchase = house_built_date - house_purchase_date 或是 click_through_rate = n_clicks / n_impressions

還有一種類似的作法叫 Polynomial Expansion 多項式展開,當 degree 為 2 時,可以把 (x, y) 兩個特徵變成 (x, x * x, y, x * y, y * y) 五個特徵。

ref:
https://spark.apache.org/docs/latest/ml-features.html#polynomialexpansion
https://elitedatascience.com/feature-engineering-best-practices

Feature Combination 特徵組合

也稱為特徵交叉。

特徵組合主要是針對 categorical 特徵,特徵交互則是適用於 continuous 特徵。但是兩者的概念是差不多的,就是把兩個以上的特徵透過某種方式結合在一起,變成新的特徵。通常用來解決一般的線性模型沒辦法學到非線性特徵的問題。

假設有 genderwealth 兩個特徵,分別有 2 和 3 種取值,最簡單的方式就是直接 string concatenation 組合出一個新的特徵 gender_wealth,共有 2 x 3 = 6 種取值。因為是 categorical 特徵,可以直接對 gender_wealth 使用 StringIndexerOneHotEncoder。你當然也可以一起組合 continuous 和 categorical 特徵,例如 age_wealth 這樣的特徵,只是 vector 裡的值就不是 0 1 而是 age 本身了。

假設 C 是 categorical 特徵,N 是 continuous 特徵,以下有幾種有意義的組合:

  • median(N) GROUP BY C 中位數
  • mean(N) GROUP BY C 算術平均數
  • mode(N) GROUP BY C 眾數
  • min(N) GROUP BY C 最小值
  • max(N) GROUP BY C 最大值
  • std(N) GROUP BY C 標準差
  • var(N) GROUP BY C 方差
  • N - median(N) GROUP BY C
user_id  age  gender  wealth  gender_wealth  gender_wealth_ohe   age_wealth
1        56   male    rich    male_rich      [1, 0, 0, 0, 0, 0]  [56, 0, 0]
2        30   male    middle  male_middle    [0, 1, 0, 0, 0, 0]  [0, 30, 0]
3        19   female  rich    female_rich    [0, 0, 0, 1, 0, 0]  [19, 0, 0]
4        62   female  poor    female_poor    [0, 0, 0, 0, 0, 1]  [0, 0, 62]
5        78   male    poor    male_poor      [0, 0, 1, 0, 0, 0]  [0, 0, 78]
6        34   female  middle  female_middle  [0, 0, 0, 0, 1, 0]  [0, 34, 0]

ref:
http://breezedeus.github.io/2014/11/15/breezedeus-feature-processing.html
http://phunters.lofter.com/post/86d56_194e956
https://zhuanlan.zhihu.com/p/26444240
http://blog.csdn.net/mytestmy/article/details/40933235

Feature Extraction 特徵提取

通常就是指 dimensionality reduction。

  • Principal Component Analysis (PCA)
  • Latent Dirichlet Allocation (LDA)
  • Latent Semantic Analysis (LSA)

Feature Selection 特徵選擇

特徵選擇是指透過某些方法自動地從所有的特徵中挑選出有用的特徵。

ref:
http://scikit-learn.org/stable/modules/feature_selection.html

Filter Method

採用某一種評估指標(發散性、相關性或 Information Gain 等),單獨地衡量個別特徵跟 target variable 之間的關係,常用的方法有 Chi Square Test(卡方檢驗)。這種特徵選擇方式沒有任何模型的參與。

以相關性來說,也不見得跟 target variable 的相關性越高就越好,

ref:
https://spark.apache.org/docs/latest/ml-features.html#chisqselector
http://files.cnblogs.com/files/XBWer/%E6%9C%BA%E5%99%A8%E5%AD%A6%E4%B9%A0%E3%81%AE%E7%89%B9%E5%BE%81.pdf

Wrapper Method

會採用某個模型來預測你的 target variable,把特徵選擇想成是一個組合優化的問題,想辦法找出一組特徵子集能夠讓模型的評估結果最好。缺點是太耗時間了,實務上不常用。

ref:
http://www.cnblogs.com/heaad/archive/2011/01/02/1924088.html

Embedded Method

通常會採用一個會為特徵賦予 coefficients 或 importances 的演算法,例如 Logistic Regression(特別是使用 L1 penalty)或 GBDT,直接用權重或重要性對所有特徵排序,然後取前 n 個作為特徵子集。

ref:
http://scikit-learn.org/stable/modules/feature_selection.html#feature-selection-using-selectfrommodel
https://www.zhihu.com/question/28641663

Feature Learning 特徵學習

也稱為 Representation Learning 或 Automated Feature Engineering。

  • GBDT
  • Neural Network: Restricted Boltzmann Machines
  • Deep Learning: Autoencoder

ref:
https://zhuanlan.zhihu.com/p/26444240

Data Leakage 數據洩漏

就是指你在 features 中直接或間接地加入了跟 target variable 有關的數據。

ref:
https://zhuanlan.zhihu.com/p/26444240

Target Engineering

雖然不能算是 feature engineering 的一部分,但是其實你也可以對 target variable / label(就是你的模型要預測的那個值)做點變換。例如 log(y + 1)exp(y) - 1

Spark troubleshooting

Spark troubleshooting

Apache Spark 2.x Troubleshooting Guide
https://www.slideshare.net/jcmia1/a-beginners-guide-on-troubleshooting-spark-applications
https://www.slideshare.net/jcmia1/apache-spark-20-tuning-guide

Check your cluster UI to ensure that workers are registered and have sufficient resources

PYSPARK_DRIVER_PYTHON="jupyter" \
PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip 0.0.0.0" \
pyspark \
--packages "org.xerial:sqlite-jdbc:3.16.1,com.github.fommil.netlib:all:1.1.2" \
--driver-memory 4g \
--executor-memory 20g \
--master spark://TechnoCore.local:7077
TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

可能是你指定的 --executor-memory 超過了 worker 的 memory。

你可以在 Spark Master UI http://localhost:8080/ 看到各個 worker 總共有多少 memory 可以用。如果每台 worker 可以用的 memory 容量不同,Spark 就只會選擇那些 memory 大於 --executor-memory 的 workers。

ref:
https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application

SparkContext was shut down

ERROR Executor: Exception in task 1.0 in stage 6034.0 (TID 21592)
java.lang.StackOverflowError
...
ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(55,1494185401195,JobFailed(org.apache.spark.SparkException: Job 55 cancelled because SparkContext was shut down))

可能是 executor 的記憶體不夠,導致 Out Of Memory (OOM) 了。

ref:
http://stackoverflow.com/questions/32822948/sparkcontext-was-shut-down-while-running-spark-on-a-large-dataset

Container exited with a non-zero exit code 56 (or some other numbers)

WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Container marked as failed: container_1504241464590_0001_01_000002 on host: albedo-w-1.c.albedo-157516.internal. Exit status: 56. Diagnostics: Exception from container-launch.
Container id: container_1504241464590_0001_01_000002
Exit code: 56
Stack trace: ExitCodeException exitCode=56:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:972)
    at org.apache.hadoop.util.Shell.run(Shell.java:869)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1170)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:236)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:305)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:84)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

Container exited with a non-zero exit code 56

可能是 executor 的記憶體不夠,導致 Out Of Memory (OOM) 了。

ref:
http://stackoverflow.com/questions/39038460/understanding-spark-container-failure

Exception in thread "main" java.lang.StackOverflowError

Exception in thread "main" java.lang.StackOverflowError
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    ...

解決辦法:

import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.SparkSession

val spark: SparkSession = SparkSession.builder().getOrCreate()
val sc = spark.sparkContext
sc.setCheckpointDir("./spark-data/checkpoint")

// 因為 sc.setCheckpointDir() 就會啟用 checkpoint 了
// 所以可以不用特別指定 checkpointInterval
val als = new ALS()
  .setCheckpointInterval(2)

ref:
https://stackoverflow.com/questions/31484460/spark-gives-a-stackoverflowerror-when-training-using-als
https://stackoverflow.com/questions/35127720/what-is-the-difference-between-spark-checkpoint-and-persist-to-a-disk

Randomness of hash of string should be disabled via PYTHONHASHSEED

解決辦法:

$ cd $SPARK_HOME
$ cp conf/spark-env.sh.template conf/spark-env.sh
$ echo "export PYTHONHASHSEED=42" >> conf/spark-env.sh

ref:
https://issues.apache.org/jira/browse/SPARK-13330

It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

因為 spark.sparkContext 只能在 driver program 裡存取,不能被 worker 存取(例如那些丟給 RDD 執行的 lambda function 或是 UDF 就是在 worker 上執行的)。

ref:
https://spark.apache.org/docs/latest/rdd-programming-guide.html#passing-functions-to-spark
https://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/

Spark automatically creates closures:

  • for functions that run on RDDs at workers,
  • and for any global variables that are used by those workers.

One closure is send per worker for every task. Closures are one way from the driver to the worker.

ref:
https://gerardnico.com/wiki/spark/closure

Unable to find encoder for type stored in a Dataset

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases. someDF.as[SomeCaseClass]

解決辦法:

import spark.implicits._

yourDF.as[YourCaseClass]

ref:
https://stackoverflow.com/questions/38664972/why-is-unable-to-find-encoder-for-type-stored-in-a-dataset-when-creating-a-dat

Task not serializable

Caused by: java.io.NotSerializableException: Settings
Serialization stack:
    - object not serializable (class: Settings, value: Settings@2dfe2f00)
    - field (class: Settings$$anonfun$1, name: $outer, type: class Settings)
    - object (class Settings$$anonfun$1, <function1>)
Caused by: org.apache.spark.SparkException:
    Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)

通常是你在 closure functions 裡使用了 driver program 裡的某個 object,因為 Spark 會自動 serialize 那個被引用的 object 一起丟給 worker node 執行,所以如果那個 object 或是 class 沒辦法被 serialize,就會出現這個錯誤。

ref:
https://www.safaribooksonline.com/library/view/spark-the-definitive/9781491912201/ch04.html#user-defined-functions
http://www.puroguramingu.com/2016/02/26/spark-dos-donts.html
https://stackoverflow.com/questions/36176011/spark-sql-udf-task-not-serialisable
https://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html
https://mp.weixin.qq.com/s/BT6sXZlHcufAFLgTONCHsg

如果你只有在 Databricks Notebook 裡遇到這個錯誤,因為 Notebook 的運作機制跟一般的 Spark application 稍微有點不同,你可以試試 package cell。

ref:
https://docs.databricks.com/user-guide/notebooks/package-cells.html

java.lang.IllegalStateException: Cannot find any build directories.

java.lang.IllegalStateException: Cannot find any build directories.
    at org.apache.spark.launcher.CommandBuilderUtils.checkState(CommandBuilderUtils.java:248)
    at org.apache.spark.launcher.AbstractCommandBuilder.getScalaVersion(AbstractCommandBuilder.java:240)
    at org.apache.spark.launcher.AbstractCommandBuilder.buildClassPath(AbstractCommandBuilder.java:194)
    at org.apache.spark.launcher.AbstractCommandBuilder.buildJavaCommand(AbstractCommandBuilder.java:117)
    at org.apache.spark.launcher.WorkerCommandBuilder.buildCommand(WorkerCommandBuilder.scala:39)
    at org.apache.spark.launcher.WorkerCommandBuilder.buildCommand(WorkerCommandBuilder.scala:45)
    at org.apache.spark.deploy.worker.CommandUtils$.buildCommandSeq(CommandUtils.scala:63)
    at org.apache.spark.deploy.worker.CommandUtils$.buildProcessBuilder(CommandUtils.scala:51)
    at org.apache.spark.deploy.worker.ExecutorRunner.org$apache$spark$deploy$worker$ExecutorRunner$$fetchAndRunExecutor(ExecutorRunner.scala:145)
    at org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:73)

可能的原因是沒有設置 SPARK_HOME 或是你的 launch script 沒有讀到該環境變數。

Observe system metrics, status, and logs on Linux

Observe system metrics, status, and logs on Linux

Linux commands that DevOps engineers (or SysAdmin) should know.

ref:
https://peteris.rocks/blog/htop/
http://techblog.netflix.com/2015/11/linux-performance-analysis-in-60s.html
http://techblog.netflix.com/2015/08/netflix-at-velocity-2015-linux.html

總覽

$ top

$ sudo apt-get install htop
$ htop

# 每 1 秒輸出一次資訊
$ vmstat 1
procs -----------memory---------- ---swap-- -----io---- -system-- ------cpu-----
 r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st
 1  0      0 1580104 171620 4287208    0    0     0    11    2    2  9  0 90  0  0
 0  0      0 1579832 171620 4287340    0    0     0     0 2871 2414 13  2 85  0  0
 0  0      0 1578688 171620 4287344    0    0     0    40 2311 1700 18  1 82  0  0
 1  0      0 1578640 171620 4287348    0    0     0    48 1302 1020  5  0 95  0  0
...

查 CPU

$ uptime

Load average: 0.03 0.11 0.19
Load average: 一分鐘 五分鐘 十五分鐘內的平均負載
單核心,如果 Load average 是 1 表示負載 100%
多核心的話,因為 Load average 是所有 CPU 數加起來,所以數值可能會大於 1

$ sudo apt-get install sysstat

# 每個 CPU 的使用率
$ mpstat -P ALL 1
Linux 3.13.0-49-generic (titanclusters-xxxxx)  07/14/2015  _x86_64_ (32 CPU)
07:38:49 PM  CPU   %usr  %nice   %sys %iowait   %irq  %soft  %steal  %guest  %gnice  %idle
07:38:50 PM  all  98.47   0.00   0.75    0.00   0.00   0.00    0.00    0.00    0.00   0.78
07:38:50 PM    0  96.04   0.00   2.97    0.00   0.00   0.00    0.00    0.00    0.00   0.99
07:38:50 PM    1  97.00   0.00   1.00    0.00   0.00   0.00    0.00    0.00    0.00   2.00
07:38:50 PM    2  98.00   0.00   1.00    0.00   0.00   0.00    0.00    0.00    0.00   1.00
...

# 每個 process 的 CPU 使用率
$ pidstat 1
Linux 3.13.0-49-generic (titanclusters-xxxxx)  07/14/2015    _x86_64_    (32 CPU)
07:41:02 PM   UID       PID    %usr %system  %guest    %CPU   CPU  Command
07:41:03 PM     0         9    0.00    0.94    0.00    0.94     1  rcuos/0
07:41:03 PM     0      4214    5.66    5.66    0.00   11.32    15  mesos-slave
07:41:03 PM     0      4354    0.94    0.94    0.00    1.89     8  java
07:41:03 PM     0      6521 1596.23    1.89    0.00 1598.11    27  java
...

查 Memory

$ free –m
             total       used       free     shared    buffers     cached
Mem:          7983       6443       1540          0        167       4192
-/+ buffers/cache:       2083       5900
Swap:            0          0          0

查 Disk

$ iostat -xz 1
Linux 3.13.0-49-generic (titanclusters-xxxxx)  07/14/2015  _x86_64_ (32 CPU)
avg-cpu:  %user   %nice %system %iowait  %steal   %idle
          73.96    0.00    3.73    0.03    0.06   22.21
Device:   rrqm/s   wrqm/s     r/s     w/s    rkB/s    wkB/s avgrq-sz avgqu-sz   await r_await w_await  svctm  %util
xvda        0.00     0.23    0.21    0.18     4.52     2.08    34.37     0.00    9.98   13.80    5.42   2.44   0.09
xvdb        0.01     0.00    1.02    8.94   127.97   598.53   145.79     0.00    0.43    1.78    0.28   0.25   0.25
xvdc        0.01     0.00    1.02    8.86   127.79   595.94   146.50     0.00    0.45    1.82    0.30   0.27   0.26

查 Disk Usage

# show whole disk
$ df -h

# show every folder under the directory
$ du -h /data

# show the top directory only
$ du -hs /var/lib/influxdb/data
77.4G    /var/lib/influxdb/data

# show largest top 10 files
$ du -hsx * | sort -rh | head -10

ref:
https://www.codecoffee.com/tipsforlinux/articles/22.html
https://www.cyberciti.biz/faq/how-do-i-find-the-largest-filesdirectories-on-a-linuxunixbsd-filesystem/

查 IO

$ sudo apt-get install dstat iotop

# 可以顯示哪些 process 在進行 io 操作
$ dstat --top-io --top-bio

# with –only option to see only processes or threads actually doing I/O
$ sudo iotop --only

ref:
https://www.cyberciti.biz/hardware/linux-iotop-simple-top-like-io-monitor/

查 CPU bound 或 IO bound

$ iostat -c | head -3 ; iostat -c 1 20

ref:
https://serverfault.com/questions/72209/cpu-or-network-i-o-bound
https://askubuntu.com/questions/1540/how-can-i-find-out-if-a-process-is-cpu-memory-or-disk-bound

iotop cannot is not working inside a container.

查 Process

$ ps aux
$ pstree -a

# attach to a process to find out system calls the process calls
# -t -- absolute timestamp
# -T -- print time spent in each syscall
# -s strsize -- limit length of print strings to STRSIZE chars (default 32)
# -f -- follow forks
# -e -- filtering expression: `option=trace,abbrev,verbose,raw,signal,read,write,fault`
# -u username -- run command as username handling setuid and/or setgid
$ strace -t -T -f -s 2048 -p THE_PID

# find out which files that nginx accesses
# you could try to find something related to the error message first:
# write(1, "Ign http://192.168.212.136 trusty Release\n", 62) = 62
# writev(12, [{"HTTP/1.1 500 Internal Server Error"..., 256}, {...}, {...}, {...}, 4]) = 276
$ strace -f -e trace=file service nginx start

# 顯示 PID 3001 的 process 是用什麼指令和參數啟動的
$ tr '\0' '\n' < /proc/3001/cmdline

# only on macOS
$ top -c a -p 1537

ref:
https://mp.weixin.qq.com/s/Sf79W5dqUFx7rUYRrtx88Q
https://blogs.oracle.com/linux/strace-the-sysadmins-microscope-v2
https://zwischenzugs.com/2011/08/29/my-favourite-secret-weapon-strace/

查 Kernel Logs

# 顯示最近的 15 筆 system messages
$ dmesg | tail -fn 15

# 顯示有關 killed process 的 logs
$ dmesg | grep -E -i -B50 'killed process'

ref:
https://stackoverflow.com/questions/726690/what-killed-my-process-and-why

查 Network

$ sar -n TCP,ETCP 1

查 DNS

Resolve a domain name using dig:

$ apt-get install curl dnsutils iputils-ping
# or
$ apk add --update bind-tools

$ dig +short october-api.default.svc.cluster.local
10.32.1.79

$ dig +short redis-broker.default.svc.cluster.local
10.60.32.20
10.60.33.15

$ dig +short redis-broker-0.redis-broker.default.svc.cluster.local
10.60.32.20

ref:
https://blog.longwin.com.tw/2013/03/dig-dns-query-debug-2013/

Resolve a domain name using nslookup:

$ apt-get install dnsutils

$ nslookup redis-broker.default.svc.cluster.local
Server:    10.3.240.10
Address 1: 10.3.240.10 kube-dns.kube-system.svc.cluster.local

Name:      redis-broker.default.svc.cluster.local
Address 1: 10.0.69.46 redis-broker-0.redis-broker.default.svc.cluster.local

Find specific types of DNS records:

$ nslookup -q=TXT codetengu.com
Server:     1.1.1.1
Address:    1.1.1.1#53

Non-authoritative answer:
codetengu.com    text = "zoho-verification=xxx.zmverify.zoho.com"

Authoritative answers can be found from:

nslookup could return multiple A records for a domain which is commonly known as round-robin DNS.

ref:
https://serverfault.com/questions/590277/why-does-nslookup-return-two-or-more-ip-address-for-yahoo-com-or-microsoft-com

查 Nginx

# 顯示各個 status code 的數量
$ cat access.log | cut -d '"' -f3 | cut -d ' ' -f2 | sort | uniq -c | sort -rn

# 顯示哪些 URL 的 404 數量最多
$ awk '($9 ~ /404/)' access.log | awk '{print $7}' | sort | uniq -c | sort -rn

# 顯示 2016/10/01 的 16:00 ~ 18:00 的 log
$ grep "01/Oct/2016:1[6-8]" access.log

# 顯示 2016/10/01 的 09:00 ~ 12:00 的 log
$ egrep "01/Oct/2016:(0[8-9]|1[0-2])" access.log

ref:
http://stackoverflow.com/questions/7575267/extract-data-from-log-file-in-specified-range-of-time
http://superuser.com/questions/848971/unix-command-to-grep-a-time-range

如果 status code 是 502 Bad Gateway
通常表示是 load balancer / nginx 的 upstream server 掛了或連不到
如果是 Kubernetes service 的話
可能是 Service spec.selector 跟 pod 匹配不起來

Build a recommender system with Spark: Implicit ALS

Build a recommender system with Spark: Implicit ALS

在這個系列的文章裡,我們將使用 Apache Spark、XGBoost、Elasticsearch 和 MySQL 等工具來搭建一個推薦系統的 Machine Learning Pipeline。推薦系統的組成可以粗略地分成 Candidate Generation 和 Ranking 兩個部分,前者是針對用戶產生候選物品集,常用的方法有 Collaborative Filtering、Content-based、標籤配對、熱門排行或人工精選等;後者則是對這些候選物品排序,以 Top N 的方式呈現最終的推薦結果,常用的方法有 Logistic Regression。

在本篇文章中,我們將以 Candidate Generation 階段常用的方法之一:Collaborative Filtering 協同過濾演算法為例,利用 Apache Spark 的 ALS (Alternating Least Squares) 模型建立一個 GitHub repositories 的推薦系統,以用戶對 repo 的打星紀錄作為訓練數據,推薦出用戶可能會感興趣的其他 repo 作為候選物品集。

完整的程式碼可以在 https://github.com/vinta/albedo 找到。

系列文章:

Submit the Application

因為需要使用 JDBC 讀取 MySQL 資料庫,必須安裝 MySQL driver,可以透過 --packages "mysql:mysql-connector-java:5.1.41" 參數在 cluster 的每一台機器上安裝需要的 Java packages。

$ spark-submit \
--packages "com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41" \
--master spark://YOUR_SPARK_MASTER:7077 \
--py-files deps.zip \
train_als.py -u vinta

ref:
https://spark.apache.org/docs/latest/submitting-applications.html

Load Data

讀取來自 MySQL 資料庫的數據。你可以使用 predicates 參數來指定 WHERE 條件,雖然嚴格來說這個參數是用來控制 partition 數量的,一個條件就是一個 partition。

假設 app_repostarring 的欄位如下:

CREATE TABLE `app_repostarring` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `from_user_id` int(11) NOT NULL,
  `from_username` varchar(39) NOT NULL,
  `repo_owner_id` int(11) NOT NULL,
  `repo_owner_username` varchar(39) NOT NULL,
  `repo_owner_type` varchar(16) NOT NULL,
  `repo_id` int(11) NOT NULL,
  `repo_name` varchar(100) NOT NULL,
  `repo_full_name` varchar(140) NOT NULL,
  `repo_description` varchar(191) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `repo_language` varchar(32) NOT NULL,
  `repo_created_at` datetime(6) NOT NULL,
  `repo_updated_at` datetime(6) NOT NULL,
  `starred_at` datetime(6) NOT NULL,
  `stargazers_count` int(11) NOT NULL,
  `forks_count` int(11) NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `from_user_id_repo_id` (`full_name`, `repo_id`)
);
def loadRawData():
    url = 'jdbc:mysql://127.0.0.1:3306/albedo?user=root&password=123&verifyServerCertificate=false&useSSL=false'
    properties = {'driver': 'com.mysql.jdbc.Driver'}
    rawDF = spark.read.jdbc(url, table='app_repostarring', properties=properties)
    return rawDF

rawDF = loadRawData()

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=jdbc#pyspark.sql.DataFrameReader.jdbc
http://www.gatorsmile.io/numpartitionsinjdbc/

Preprocess Data

Format Data

把 raw data 整理成 user,item,rating,starred_at 這樣的格式。starred_at 只有評價 model 時有用來排序,訓練 model 時並沒有用到,因為 Spark 的 ALS 沒辦法輕易地整合 side information。

from pyspark.ml import Transformer

class RatingBuilder(Transformer):

    def _transform(self, rawDF):
        ratingDF = rawDF \
            .selectExpr('from_user_id AS user', 'repo_id AS item', '1 AS rating', 'starred_at') \
            .orderBy('user', F.col('starred_at').desc())
        return ratingDF

ratingBuilder = RatingBuilder()
ratingDF = ratingBuilder.transform(rawDF)
ratingDF.cache()

ref:
http://blog.ethanrosenthal.com/2016/11/07/implicit-mf-part-2/

Inspect Data

import pyspark.sql.functions as F

ratingDF.rdd.getNumPartitions()
# 200

ratingDF.agg(F.count('rating'), F.countDistinct('user'), F.countDistinct('item')).show()
# +-------------+--------------------+--------------------+
# |count(rating)|count(DISTINCT user)|count(DISTINCT item)|
# +-------------+--------------------+--------------------+
# |      3121629|               10483|              551216|
# +-------------+--------------------+--------------------+

stargazersCountDF = ratingDF \
    .groupBy('item') \
    .agg(F.count('user').alias('stargazers_count')) \
    .orderBy('stargazers_count', ascending=False)
stargazersCountDF.show(10)
# +--------+----------------+
# |    item|stargazers_count|
# +--------+----------------+
# | 2126244|            2211|
# |10270250|            1683|
# |  943149|            1605|
# |  291137|            1567|
# |13491895|            1526|
# | 9384267|            1480|
# | 3544424|            1468|
# | 7691631|            1441|
# |29028775|            1427|
# | 1334369|            1399|
# +--------+----------------+

starredCountDF = ratingDF \
    .groupBy('user') \
    .agg(F.count('item').alias('starred_count')) \
    .orderBy('starred_count', ascending=False)
starredCountDF.show(10)
# +-------+-------------+
# |   user|starred_count|
# +-------+-------------+
# |3947125|         8947|
# |5527642|         7978|
# | 446613|         7860|
# | 627410|         7800|
# |  13998|         6334|
# |2467194|         6327|
# |  63402|         6034|
# |2005841|         6024|
# |5073946|         5980|
# |   2296|         5862|
# +-------+-------------+

Clean Data

你可以過濾掉那些太少 user 打星的 item 和打星了太少 item 的 user,提昇矩陣的稠密度。這個現象也正好是 Cold Start 的問題,你就是沒有足夠多的關於這些 item 和 user 的數據(可以考慮使用 content-based 的推薦方式)。除此之外,如果你的推薦系統所推薦的 item 只有非常少人打星,即便你完美地挖掘了長尾效應,這樣的推薦結果給用戶的「第一印象」可能也不會太好(這可能決定了他要不要繼續使用這個系統或是他要不要真的去嘗試那個你推薦給他的東西)。

你也可以選擇要不要過濾掉那些超多人打星的 item 和打星了超多 item 的 user。如果某些 item 有超過八、九成的 user 都打星了,對於這麼熱門的 item,可能也沒有推薦的必要了,因為其他 user 早晚也會自己發現的;如果有少數的 user 幾乎打星了一半以上的 item,這些 user 可能是屬於某種 web crawler 的用途或是這些 user 就是那種看到什麼就打星什麼的人,無論是哪一種,他們可能都不是你想要 modeling 的對象,可以考慮從 dataset 中拿掉。

實務上,如果你有關於 user 或 item 的黑名單,例如一些 SPAM 帳號或 NSFW 的內容等,也可以在這個步驟把它們過濾掉。

from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import Param
import pyspark.sql.functions as F

class DataCleaner(Transformer):

    @keyword_only
    def __init__(self, minItemStargazersCount=None, maxItemStargazersCount=None, minUserStarredCount=None, maxUserStarredCount=None):
        super(DataCleaner, self).__init__()
        self.minItemStargazersCount = Param(self, 'minItemStargazersCount', '移除 stargazer 數低於這個數字的 item')
        self.maxItemStargazersCount = Param(self, 'maxItemStargazersCount', '移除 stargazer 數超過這個數字的 item')
        self.minUserStarredCount = Param(self, 'minUserStarredCount', '移除 starred repo 數低於這個數字的 user')
        self.maxUserStarredCount = Param(self, 'maxUserStarredCount', '移除 starred repo 數超過這個數字的 user')
        self._setDefault(minItemStargazersCount=1, maxItemStargazersCount=50000, minUserStarredCount=1, maxUserStarredCount=50000)
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, minItemStargazersCount=None, maxItemStargazersCount=None, minUserStarredCount=None, maxUserStarredCount=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def setMinItemStargazersCount(self, value):
        self._paramMap[self.minItemStargazersCount] = value
        return self

    def getMinItemStargazersCount(self):
        return self.getOrDefault(self.minItemStargazersCount)

    def setMaxItemStargazersCount(self, value):
        self._paramMap[self.maxItemStargazersCount] = value
        return self

    def getMaxItemStargazersCount(self):
        return self.getOrDefault(self.maxItemStargazersCount)

    def setMinUserStarredCount(self, value):
        self._paramMap[self.minUserStarredCount] = value
        return self

    def getMinUserStarredCount(self):
        return self.getOrDefault(self.minUserStarredCount)

    def setMaxUserStarredCount(self, value):
        self._paramMap[self.maxUserStarredCount] = value
        return self

    def getMaxUserStarredCount(self):
        return self.getOrDefault(self.maxUserStarredCount)

    def _transform(self, ratingDF):
        minItemStargazersCount = self.getMinItemStargazersCount()
        maxItemStargazersCount = self.getMaxItemStargazersCount()
        minUserStarredCount = self.getMinUserStarredCount()
        maxUserStarredCount = self.getMaxUserStarredCount()

        toKeepItemsDF = ratingDF \
            .groupBy('item') \
            .agg(F.count('user').alias('stargazers_count')) \
            .where('stargazers_count >= {0} AND stargazers_count <= {1}'.format(minItemStargazersCount, maxItemStargazersCount)) \
            .orderBy('stargazers_count', ascending=False) \
            .select('item', 'stargazers_count')
        temp1DF = ratingDF.join(toKeepItemsDF, 'item', 'inner')

        toKeepUsersDF = temp1DF \
            .groupBy('user') \
            .agg(F.count('item').alias('starred_count')) \
            .where('starred_count >= {0} AND starred_count <= {1}'.format(minUserStarredCount, maxUserStarredCount)) \
            .orderBy('starred_count', ascending=False) \
            .select('user', 'starred_count')
        temp2DF = temp1DF.join(toKeepUsersDF, 'user', 'inner')

        cleanDF = temp2DF.select('user', 'item', 'rating', 'starred_at')
        return cleanDF

dataCleaner = DataCleaner(
    minItemStargazersCount=2,
    maxItemStargazersCount=4000,
    minUserStarredCount=2,
    maxUserStarredCount=5000
)
cleanDF = dataCleaner.transform(ratingDF)

cleanDF.agg(F.count('rating'), F.countDistinct('user'), F.countDistinct('item')).show()
# +-------------+--------------------+--------------------+
# |count(rating)|count(DISTINCT user)|count(DISTINCT item)|
# +-------------+--------------------+--------------------+
# |      2761118|               10472|              245626|
# +-------------+--------------------+--------------------+

Generate Negative Samples

對 implicit feedback 的 ALS 來說,手動加入負樣本(Rui = 0 的樣本)是沒有意義的,因為 missing value / non-observed value 對該演算法來說本來就是 0,表示用戶確實沒有對該物品做出行為,也就是 Pui = 0 沒有偏好,所以 Cui = 1 + alpha x 0 置信度也會比其他正樣本低。不過因為 Spark ML 的 ALS 只會計算 Rui > 0 的項目,所以即便你手動加入了 Rui = 0 或 Rui = -1 的負樣本,對整個模型其實沒有影響。

雖然沒有負樣本你就不能算 area under ROC curve 或是 area under Precision-Recall curve 等 binary classifier 用的指標,不過你可以改用 Learning to rank 的評估方式,例如 NDCG 或 Mean Average Precision 等。但是 ALS 的 loss function 也沒辦法直接優化 NDCG 這樣的指標就是了。

ref:
https://vinta.ws/code/generate-negative-samples-for-recommender-system.html

Split Data

因為 Matrix Factorization 需要考慮每個 user-item pair,如果你餵給 model 它沒見過的資料,它就沒辦法進行推薦(冷啟動問題)。只要 user 或 item 其中之一不存在於 dataset 裡,ALS model 所輸出的 prediction 值就會是 NaN。所以應該盡量保持每個 user 和 item 都出現在 training set 和 testing set 裡,例如隨機挑出每個 user 的任意 n 個或 n 比例的評分作為 test set,剩下的評分當作 training set(俗稱 leave-n-out)。如果使用 Machine Learning 中常見的 holdout 方式,隨機地把所有 data point 分散到 training set 和 test set(例如 df.randomSplit([0.7, 0.3])),會有很高的機率造成部分 user 或 item 只出現在其中一組 dataset 裡。

ref:
https://jessesw.com/Rec-System/
http://blog.ethanrosenthal.com/2016/10/19/implicit-mf-part-1/

從 LibRec 的文件上也可以發現還有許多拆分數據的方式,例如:

  • 基于 Ratio 的分类方法为通过给定的比例来将数据分为两部分。这个分类过程可以在所有数据中进行随机分类,也可以在用户或者物品维度上进行分类。当有时间的特征时,可以根据时间顺序留出最后一定比例的数据来进行测试。
  • LooCV 的分割方法为 leave-one-user/item/rating-out,也就是随机选取每个 user 的任意一个 item 或者每个 item 的任意一个 user 作为测试数据,余下的数据来作为训练数据。在实现中实现了基于 User 和基于 Item 的多种分类方式。
  • GivenN 分割方法是指为每个用户留出指定数目 N 的数据来作为测试用例,余下的样本作为训练数据。
  • KCV 即 K 折交叉验证。将数据分割为 K 份,在每次执行时选择其中一份作为测试数据,余下的数据作为训练数据,共执行 K 次。综合 K 次的训练结果来对推荐算法的性能进行评估。

ref:
https://www.librec.net/dokuwiki/doku.php?id=DataModel_zh#splitter

這裡我們用 sampleBy() 簡單地寫了一個根據 user 來隨機劃分 item 到 training set 和 test set 的方法。

def randomSplitByUser(df, weights, seed=None):
    trainingRation = weights[0]
    fractions = {row['user']: trainingRation for row in df.select('user').distinct().collect()}
    training = df.sampleBy('user', fractions, seed)
    testRDD = df.rdd.subtract(training.rdd)
    test = spark.createDataFrame(testRDD, df.schema)
    return training, test

training, test = randomSplitByUser(ratingDF, weights=[0.7, 0.3])

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.sampleBy

Train the Model

from pyspark.ml.recommendation import ALS

als = ALS(implicitPrefs=True, seed=42) \
    .setRank(50) \
    .setMaxIter(22) \
    .setRegParam(0.5) \
    .setAlpha(40)

alsModel = als.fit(training)

# 這些就是訓練出來的 user 和 item 的 Latent Factors
alsModel.userFactors.show()
alsModel.itemFactors.show()

ref:
https://spark.apache.org/docs/latest/ml-collaborative-filtering.html
https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS

Predict Preferences

from pyspark.ml import Transformer

predictedDF = alsModel.transform(testing)

class PredictionProcessor(Transformer):

    def _transform(self, predictedDF):
        nonNullDF = predictedDF.dropna(subset=['prediction', ])
        predictionDF = nonNullDF.withColumn('prediction', nonNullDF['prediction'].cast('double'))
        return predictionDF

# 刪掉那些 NaN 的數據
predictionProcessor = PredictionProcessor()
predictionDF = predictionProcessor.transform(predictedDF)

Evaluate the Model

因為 Spark ML 沒有提供給 DataFrame 用的 ranking evaluator,我們只好自己寫一個,但是內部還是使用 Spark MLlib 的 RankingMetrics。不過這個只是 offline 的評估方式而已,等到要實際上線的時候可能還需要做 A/B testing。

from pyspark import keyword_only
from pyspark.ml.evaluation import Evaluator
from pyspark.ml.param.shared import Param
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
import pyspark.sql.functions as F

class RankingEvaluator(Evaluator):

    @keyword_only
    def __init__(self, k=None):
        super(RankingEvaluator, self).__init__()
        self.k = Param(self, 'k', 'Top K')
        self._setDefault(k=30)
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, k=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def isLargerBetter(self):
        return True

    def setK(self, value):
        self._paramMap[self.k] = value
        return self

    def getK(self):
        return self.getOrDefault(self.k)

    def _evaluate(self, predictedDF):
        k = self.getK()

        predictedDF.show()

        windowSpec = Window.partitionBy('user').orderBy(col('prediction').desc())
        perUserPredictedItemsDF = predictedDF \
            .select('user', 'item', 'prediction', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        windowSpec = Window.partitionBy('user').orderBy(col('starred_at').desc())
        perUserActualItemsDF = predictedDF \
            .select('user', 'item', 'starred_at', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        perUserItemsRDD = perUserPredictedItemsDF.join(F.broadcast(perUserActualItemsDF), 'user', 'inner') \
            .rdd \
            .map(lambda row: (row[1], row[2]))

        if perUserItemsRDD.isEmpty():
            return 0.0

        rankingMetrics = RankingMetrics(perUserItemsRDD)
        metric = rankingMetrics.ndcgAt(k)
        return metric

k = 30
rankingEvaluator = RankingEvaluator(k=k)
ndcg = rankingEvaluator.evaluate(predictionDF)
print('NDCG', ndcg)

ref:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html

Recommend Items

實際感受一下推薦系統的效果如何。這裡是直接把結果 print 出來,而沒有把推薦結果儲存到資料庫。不過通常不會直接就把推薦系統輸出的東西展示給用戶,會先經過一些過濾、排序和產生推薦理由等等的步驟,或是加入一些人為的規則,比如說強制插入廣告、最近主打的商品或是過濾掉那些很多人點擊但是其實質量並不怎麼樣的東西。當然也有可能會把這個推薦系統的輸出作為其他機器學習 model 的輸入。

ref:
https://www.zhihu.com/question/28247353

def recommendItems(rawDF, alsModel, username, topN=30, excludeKnownItems=False):
    userID = rawDF \
        .where('from_username = "{0}"'.format(username)) \
        .select('from_user_id') \
        .take(1)[0]['from_user_id']

    userItemsDF = alsModel \
        .itemFactors. \
        selectExpr('{0} AS user'.format(userID), 'id AS item')
    if excludeKnownItems:
        userKnownItemsDF = rawDF \
            .where('from_user_id = {0}'.format(userID)) \
            .selectExpr('repo_id AS item')
        userItemsDF = userItemsDF.join(userKnownItemsDF, 'item', 'left_anti')

    userPredictedDF = alsModel \
        .transform(userItemsDF) \
        .select('item', 'prediction') \
        .orderBy('prediction', ascending=False) \
        .limit(topN)

    repoDF = rawDF \
        .groupBy('repo_id', 'repo_full_name', 'repo_language') \
        .agg(F.max('stargazers_count').alias('stargazers_count'))

    recommendedItemsDF = userPredictedDF \
        .join(repoDF, userPredictedDF['item'] == repoDF['repo_id'], 'inner') \
        .select('prediction', 'repo_full_name', 'repo_language', 'stargazers_count') \
        .orderBy('prediction', ascending=False)

    return recommendedItemsDF

k = 30
username = 'vinta'
recommendedItemsDF = recommendItems(rawDF, alsModel, username, topN=k, excludeKnownItems=False)
for item in recommendedItemsDF.collect():
    repoName = item['repo_full_name']
    repoUrl = 'https://github.com/{0}'.format(repoName)
    print(repoUrl, item['prediction'], item['repo_language'], item['stargazers_count'])

ref:
https://github.com/vinta/albedo/blob/master/src/main/python/train_als.ipynb

Cross-validate Models

使用 Spark ML 的 pipeline 來做 cross-validation,選出最適合的 hyperparameters 組合。

  • rank: The number of latent factors in the model, or equivalently, the number of columns k in the user-feature and product-feature matrices.
  • regParam: A standard overfitting parameter, also usually called lambda. Higher values resist overfitting, but values that are too high hurt the factorization’s accuracy.
  • alpha: Controls the relative weight of observed versus unobserved user-product interactions in the factorization.
  • maxIter: The number of iterations that the factorization runs. More iterations take more time but may produce a better factorization.
dataCleaner = DataCleaner()

als = ALS(implicitPrefs=True, seed=42)

predictionProcessor = PredictionProcessor()

pipeline = Pipeline(stages=[
    dataCleaner,
    als,
    predictionProcessor,
])

paramGrid = ParamGridBuilder() \
    .addGrid(dataCleaner.minItemStargazersCount, [1, 10, 100]) \
    .addGrid(dataCleaner.maxItemStargazersCount, [4000, ]) \
    .addGrid(dataCleaner.minUserStarredCount, [1, 10, 100]) \
    .addGrid(dataCleaner.maxUserStarredCount, [1000, 4000, ]) \
    .addGrid(als.rank, [50, 100]) \
    .addGrid(als.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(als.alpha, [0.01, 0.89, 1, 40, ]) \
    .addGrid(als.maxIter, [22, ]) \
    .build()

rankingEvaluator = RankingEvaluator(k=30)

cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=rankingEvaluator,
                    numFolds=2)

cvModel = cv.fit(ratingDF)

def printCrossValidationParameters(cvModel):
    metric_params_pairs = list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))
    metric_params_pairs.sort(key=lambda x: x[0], reverse=True)
    for pair in metric_params_pairs:
        metric, params = pair
        print('metric', metric)
        for k, v in params.items():
            print(k.name, v)
        print('')

printCrossValidationParameters(cvModel)

ref:
https://spark.apache.org/docs/latest/ml-pipeline.html