Play with GitHub Archive Dataset on BigQuery

Play with GitHub Archive Dataset on BigQuery

Google BigQuery is a web service that lets you do interactive analysis of very massive datasets - analyzing billions of rows in seconds.

ref:
https://www.githubarchive.org/#bigquery
https://bigquery.cloud.google.com/table/githubarchive:month.201612

See also:
http://ghtorrent.org/

Show repository informations (1)

WITH repo_info AS (
  SELECT repo.id AS id, repo.name AS name, JSON_EXTRACT_SCALAR(payload, '$.pull_request.base.repo.description') AS description
  FROM `githubarchive.month.2017*`
  -- FROM `githubarchive.year.2016`
  -- FROM `githubarchive.year.*`
  WHERE type = "PullRequestEvent"
)

SELECT repo_info.name, ANY_VALUE(repo_info.description) AS description
FROM repo_info
WHERE
  repo_info.description IS NOT NULL AND
  repo_info.description != ""
GROUP BY repo_info.name
ORDER BY repo_info.name

ref:
https://cloud.google.com/bigquery/docs/reference/standard-sql/functions-and-operators#json-functions
https://cloud.google.com/bigquery/docs/reference/standard-sql/functions-and-operators#any_value

Show repository informations (2)

WITH repo_info AS (
  SELECT repo.id AS id, repo.name AS name, JSON_EXTRACT_SCALAR(payload, '$.description') AS description
  FROM `githubarchive.month.201708`
  WHERE type = "CreateEvent"
)

SELECT repo_info.name, ANY_VALUE(repo_info.description) AS description
FROM repo_info
WHERE
  repo_info.description IS NOT NULL AND
  repo_info.description != ""
GROUP BY repo_info.name
ORDER BY repo_info.name

Show repository informations (3)

SELECT name, description
FROM `ghtorrent-bq.ght_2017_04_01.projects`
WHERE
  forked_from IS NULL AND
  description IS NOT NULL AND
  description != ""

Show starred repositories by a specific user

You must use WatchEvent for starring a repo:
https://developer.github.com/v3/activity/events/types/#watchevent

SELECT repo.name, created_at
FROM TABLE_QUERY([githubarchive:month], 'LEFT(table_ID,4) IN ("2017","2016","2015")') 
WHERE type = "WatchEvent" AND actor.login = 'vinta'
GROUP BY repo.name, created_at
ORDER BY created_at DESC

Show starred repositories per user who has 10+ starred repositories

WITH stars AS (
     SELECT DISTINCT actor.login AS user, repo.name AS repo
     FROM `githubarchive.month.2017*`
     WHERE type="WatchEvent"
),
repositories_stars AS (
     SELECT repo, COUNT(*) as c FROM stars GROUP BY repo
     ORDER BY c DESC
     LIMIT 1000
),
users_stars AS (
    SELECT user, COUNT(*) as c FROM  stars
    WHERE repo IN (SELECT repo FROM repositories_stars)
    GROUP BY user
    HAVING c >= 10
    LIMIT 10000
)
SELECT user, repo FROM stars
WHERE repo IN (SELECT repo FROM repositories_stars)
AND user IN (SELECT user FROM users_stars)

ref:
https://gist.github.com/jbochi/2e8ddcc5939e70e5368326aa034a144e

Setup Spark, Scala and Maven with Intellij IDEA

Setup Spark, Scala and Maven with Intellij IDEA

IntelliJ IDEA supports Scala and Apache Spark perfectly. You're able to browse a complete Spark project built with IntelliJ IDEA on GitHub: https://github.com/vinta/albedo

Useful Plugins:

Initiate a Maven Project

$ mvn archetype:generate
Choose a number: xxx
xxx: remote -> net.alchim31.maven:scala-archetype-simple

ref:
https://docs.scala-lang.org/tutorials/scala-with-maven.html

Example Configurations

The remaining section of this article assumes that you use this pom.xml which should be able to work out of the box.

<!-- in pom.xml -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>ws.vinta</groupId>
  <artifactId>albedo</artifactId>
  <version>1.0.0-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>${project.artifactId}</name>
  <description>A recommender system for discovering GitHub repos</description>
  <url>https://github.com/vinta/albedo</url>
  <inceptionYear>2017</inceptionYear>
  <properties>
    <java.version>1.8</java.version>
    <scala.version>2.11.8</scala.version>
    <scala.compactVersion>2.11</scala.compactVersion>
    <spark.version>2.2.0</spark.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  <repositories>
    <repository>
      <id>spark-packages</id>
      <name>Spark Packages Repository</name>
      <url>https://dl.bintray.com/spark-packages/maven/</url>
    </repository>
  </repositories>
  <dependencies>
    <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_${scala.compactVersion}</artifactId>
      <version>${spark.version}</version>
      <scope>compile</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11 -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_${scala.compactVersion}</artifactId>
      <version>${spark.version}</version>
      <scope>compile</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib_2.11 -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_${scala.compactVersion}</artifactId>
      <version>${spark.version}</version>
      <scope>compile</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.42</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpclient</artifactId>
      <version>4.5.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-high-level-client -->
    <dependency>
      <groupId>org.elasticsearch.client</groupId>
      <artifactId>elasticsearch-rest-high-level-client</artifactId>
      <version>5.6.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.hankcs/hanlp -->
    <dependency>
      <groupId>com.hankcs</groupId>
      <artifactId>hanlp</artifactId>
      <version>portable-1.3.4</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.github.rholder/snowball-stemmer -->
    <dependency>
      <groupId>com.github.rholder</groupId>
      <artifactId>snowball-stemmer</artifactId>
      <version>1.3.0.581.1</version>
    </dependency>
  </dependencies>
  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.6.1</version>
        <configuration>
          <source>${java.version}</source>
          <target>${java.version}</target>
          <encoding>UTF-8</encoding>
        </configuration>
      </plugin>
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.1</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-install-plugin</artifactId>
        <version>2.5.2</version>
        <configuration>
          <skip>true</skip>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.0</version>
        <executions>
          <execution>
           <phase>install</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                    <exclude>META-INF/*.SF</exclude>
                  </excludes>
                </filter>
              </filters>
              <artifactSet>
                <excludes>
                  <exclude>com.apple:AppleJavaExtensions:*</exclude>
                  <exclude>javax.servlet:*</exclude>
                  <exclude>org.apache.hadoop:*</exclude>
                  <exclude>org.apache.maven.plugins:*</exclude>
                  <exclude>org.apache.parquet:*</exclude>
                  <exclude>org.apache.spark:*</exclude>
                  <exclude>org.scala-lang:*</exclude>
                </excludes>
              </artifactSet>
              <finalName>${project.artifactId}-${project.version}-uber</finalName>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

ref:
https://davidb.github.io/scala-maven-plugin/example_compile.html

Generate a Thin JAR

Thin JAR only contains classes that you created, which means you should include your dependencies externally.

$ mvn clean package -DskipTests

You're able to specify different classes in the same JAR.

$ spark-submit \
--master spark://localhost:7077 \
--packages "mysql:mysql-connector-java:5.1.41" \
--class ws.vinta.albedo.LogisticRegressionRanker \
target/albedo-1.0.0-SNAPSHOT.jar

ref:
https://stackoverflow.com/questions/1082580/how-to-build-jars-from-intellij-properly
https://spark.apache.org/docs/latest/submitting-applications.html

Generate a Fat JAR, Shaded JAR or Uber JAR

CAUTION: DO NOT ENABLE <minimizeJar>true</minimizeJar> in the maven-shade-plugin, it will ruin your day!

<!-- in pom.xml -->
<project>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-install-plugin</artifactId>
        <version>2.5.2</version>
        <configuration>
          <skip>true</skip>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.0</version>
        <executions>
          <execution>
            <phase>install</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <artifactSet>
                <excludes>
                  <exclude>com.apple:AppleJavaExtensions:*</exclude>
                  <exclude>javax.servlet:*</exclude>
                  <exclude>org.apache.hadoop:*</exclude>
                  <exclude>org.apache.maven.plugins:*</exclude>
                  <exclude>org.apache.parquet:*</exclude>
                  <exclude>org.apache.spark:*</exclude>
                  <exclude>org.scala-lang:*</exclude>
                </excludes>
              </artifactSet>
              <finalName>${project.artifactId}-${project.version}-uber</finalName>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>
# the output jar will be located in "target/albedo-1.0.0-SNAPSHOT-uber.jar"
$ mvn clean install -DskipTests
$ spark-submit \
--master spark://localhost:7077 \
--class ws.vinta.albedo.LogisticRegressionRanker \
target/albedo-1.0.0-SNAPSHOT-uber.jar

ref:
http://maven.apache.org/plugins/maven-shade-plugin/examples/includes-excludes.html
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/missing_dependencies_in_jar_files.html

Run a Spark Application in Local Mode

Follow "Run > Edit Configurations":

  • VM options: -Xms12g -Xmx12g -Dspark.master="local[*]"
  • Before launch:
    • Build

Or

// in YourSparkApp.scala
package ws.vinta.albedo

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object YourSparkApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .set("spark.driver.memory", "12g")

    val spark = SparkSession
      .builder()
      .appName("YourSparkApp")
      .config(conf)
      .getOrCreate()

    spark.stop()
  }
}

ref:
https://github.com/jaceklaskowski/mastering-apache-spark-book/blob/master/spark-sparkcontext-creating-instance-internals.adoc
https://stackoverflow.com/questions/43054268/how-to-set-spark-memorystore-size-when-running-in-intellij-scala-console

Run a Spark Application in Standalone Mode

First, start your Spark Standalone cluster:

$ cd ${SPARK_HOME}
$ ./sbin/start-master.sh -h 0.0.0.0
$ ./sbin/start-slave.sh spark://localhost:7077

# print logs from Spark master and workers, useful for debuging
$ tail -f ${SPARK_HOME}/logs/*

Follow "Run > Edit Configurations":

  • VM options: -Dspark.master=spark://localhost:7077 -Dspark.driver.memory=2g -Dspark.executor.memory=12g -Dspark.executor.cores=3
    • Local cluster mode: -Dspark.master="local-cluster[x, y, z]" with x workers, y cores per worker, and z MB memory per worker
    • Local cluster mode doesn't need a real Spark Standalone cluster
  • Before launch:
    • Build
    • Run Maven Goal 'albedo: clean install'

Or

// in YourSparkApp.scala
package ws.vinta.albedo

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object YourSparkApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("spark://localhost:7077")
      .set("spark.driver.memory", "2g")
      .set("spark.executor.memory", "12g")
      .set("spark.executor.cores", "3")
      .setJars(List("target/albedo-1.0.0-SNAPSHOT-uber.jar"))
      // or
      .setMaster("local-cluster[1, 3, 12288]")
      .setJars(List("target/albedo-1.0.0-SNAPSHOT-uber.jar"))

    val spark = SparkSession
      .builder()
      .appName("YourSparkApp")
      .config(conf)
      .getOrCreate()

    spark.stop()
  }
}

In the end, there are some glossaries which need to be clarified:

  • compile: compile a single .java or .scala into *.class
  • make: compile changed files only
  • build: compile every files in the project

ref:
http://www.jianshu.com/p/b4e4658c459c

Specify a Custom Logging Configuration

$ cd PROJECT_ROOT
$ cp $SPARK_HOME/conf/log4j.properties.template log4j.properties

Follow "Run > Edit Configurations":

  • VM options: -Dlog4j.configuration=file:./log4j.properties

ref:
https://spark.apache.org/docs/latest/sql-programming-guide.html#getting-started
https://stackoverflow.com/questions/43054268/how-to-set-spark-memorystore-size-when-running-in-intellij-scala-console
https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Config-log4j-in-Spark/td-p/34968

Spark troubleshooting

Spark troubleshooting

Apache Spark 2.x Troubleshooting Guide
https://www.slideshare.net/jcmia1/a-beginners-guide-on-troubleshooting-spark-applications
https://www.slideshare.net/jcmia1/apache-spark-20-tuning-guide

Check your cluster UI to ensure that workers are registered and have sufficient resources

PYSPARK_DRIVER_PYTHON="jupyter" \
PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip 0.0.0.0" \
pyspark \
--packages "org.xerial:sqlite-jdbc:3.16.1,com.github.fommil.netlib:all:1.1.2" \
--driver-memory 4g \
--executor-memory 20g \
--master spark://TechnoCore.local:7077
TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

可能是你指定的 --executor-memory 超過了 worker 的 memory。

你可以在 Spark Master UI http://localhost:8080/ 看到各個 worker 總共有多少 memory 可以用。如果每台 worker 可以用的 memory 容量不同,Spark 就只會選擇那些 memory 大於 --executor-memory 的 workers。

ref:
https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application

SparkContext was shut down

ERROR Executor: Exception in task 1.0 in stage 6034.0 (TID 21592)
java.lang.StackOverflowError
...
ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(55,1494185401195,JobFailed(org.apache.spark.SparkException: Job 55 cancelled because SparkContext was shut down))

可能是 executor 的記憶體不夠,導致 Out Of Memory (OOM) 了。

ref:
http://stackoverflow.com/questions/32822948/sparkcontext-was-shut-down-while-running-spark-on-a-large-dataset

Container exited with a non-zero exit code 56 (or some other numbers)

WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Container marked as failed: container_1504241464590_0001_01_000002 on host: albedo-w-1.c.albedo-157516.internal. Exit status: 56. Diagnostics: Exception from container-launch.
Container id: container_1504241464590_0001_01_000002
Exit code: 56
Stack trace: ExitCodeException exitCode=56:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:972)
    at org.apache.hadoop.util.Shell.run(Shell.java:869)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1170)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:236)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:305)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:84)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

Container exited with a non-zero exit code 56

可能是 executor 的記憶體不夠,導致 Out Of Memory (OOM) 了。

ref:
http://stackoverflow.com/questions/39038460/understanding-spark-container-failure

Exception in thread "main" java.lang.StackOverflowError

Exception in thread "main" java.lang.StackOverflowError
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    ...

解決辦法:

import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.SparkSession

val spark: SparkSession = SparkSession.builder().getOrCreate()
val sc = spark.sparkContext
sc.setCheckpointDir("./spark-data/checkpoint")

// 因為 sc.setCheckpointDir() 就會啟用 checkpoint 了
// 所以可以不用特別指定 checkpointInterval
val als = new ALS()
  .setCheckpointInterval(2)

ref:
https://stackoverflow.com/questions/31484460/spark-gives-a-stackoverflowerror-when-training-using-als
https://stackoverflow.com/questions/35127720/what-is-the-difference-between-spark-checkpoint-and-persist-to-a-disk

Randomness of hash of string should be disabled via PYTHONHASHSEED

解決辦法:

$ cd $SPARK_HOME
$ cp conf/spark-env.sh.template conf/spark-env.sh
$ echo "export PYTHONHASHSEED=42" >> conf/spark-env.sh

ref:
https://issues.apache.org/jira/browse/SPARK-13330

It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

因為 spark.sparkContext 只能在 driver program 裡存取,不能被 worker 存取(例如那些丟給 RDD 執行的 lambda function 或是 UDF 就是在 worker 上執行的)。

ref:
https://spark.apache.org/docs/latest/rdd-programming-guide.html#passing-functions-to-spark
https://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/

Spark automatically creates closures:

  • for functions that run on RDDs at workers,
  • and for any global variables that are used by those workers.

One closure is send per worker for every task. Closures are one way from the driver to the worker.

ref:
https://gerardnico.com/wiki/spark/closure

Unable to find encoder for type stored in a Dataset

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases. someDF.as[SomeCaseClass]

解決辦法:

import spark.implicits._

yourDF.as[YourCaseClass]

ref:
https://stackoverflow.com/questions/38664972/why-is-unable-to-find-encoder-for-type-stored-in-a-dataset-when-creating-a-dat

Task not serializable

Caused by: java.io.NotSerializableException: Settings
Serialization stack:
    - object not serializable (class: Settings, value: Settings@2dfe2f00)
    - field (class: Settings$$anonfun$1, name: $outer, type: class Settings)
    - object (class Settings$$anonfun$1, <function1>)
Caused by: org.apache.spark.SparkException:
    Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)

通常是你在 closure functions 裡使用了 driver program 裡的某個 object,因為 Spark 會自動 serialize 那個被引用的 object 一起丟給 worker node 執行,所以如果那個 object 或是 class 沒辦法被 serialize,就會出現這個錯誤。

ref:
https://www.safaribooksonline.com/library/view/spark-the-definitive/9781491912201/ch04.html#user-defined-functions
http://www.puroguramingu.com/2016/02/26/spark-dos-donts.html
https://stackoverflow.com/questions/36176011/spark-sql-udf-task-not-serialisable
https://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html
https://mp.weixin.qq.com/s/BT6sXZlHcufAFLgTONCHsg

如果你只有在 Databricks Notebook 裡遇到這個錯誤,因為 Notebook 的運作機制跟一般的 Spark application 稍微有點不同,你可以試試 package cell。

ref:
https://docs.databricks.com/user-guide/notebooks/package-cells.html

java.lang.IllegalStateException: Cannot find any build directories.

java.lang.IllegalStateException: Cannot find any build directories.
    at org.apache.spark.launcher.CommandBuilderUtils.checkState(CommandBuilderUtils.java:248)
    at org.apache.spark.launcher.AbstractCommandBuilder.getScalaVersion(AbstractCommandBuilder.java:240)
    at org.apache.spark.launcher.AbstractCommandBuilder.buildClassPath(AbstractCommandBuilder.java:194)
    at org.apache.spark.launcher.AbstractCommandBuilder.buildJavaCommand(AbstractCommandBuilder.java:117)
    at org.apache.spark.launcher.WorkerCommandBuilder.buildCommand(WorkerCommandBuilder.scala:39)
    at org.apache.spark.launcher.WorkerCommandBuilder.buildCommand(WorkerCommandBuilder.scala:45)
    at org.apache.spark.deploy.worker.CommandUtils$.buildCommandSeq(CommandUtils.scala:63)
    at org.apache.spark.deploy.worker.CommandUtils$.buildProcessBuilder(CommandUtils.scala:51)
    at org.apache.spark.deploy.worker.ExecutorRunner.org$apache$spark$deploy$worker$ExecutorRunner$$fetchAndRunExecutor(ExecutorRunner.scala:145)
    at org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:73)

可能的原因是沒有設置 SPARK_HOME 或是你的 launch script 沒有讀到該環境變數。

Build a recommender system with Spark: Implicit ALS

Build a recommender system with Spark: Implicit ALS

在這個系列的文章裡,我們將使用 Apache Spark、XGBoost、Elasticsearch 和 MySQL 等工具來搭建一個推薦系統的 Machine Learning Pipeline。推薦系統的組成可以粗略地分成 Candidate Generation 和 Ranking 兩個部分,前者是針對用戶產生候選物品集,常用的方法有 Collaborative Filtering、Content-based、標籤配對、熱門排行或人工精選等;後者則是對這些候選物品排序,以 Top N 的方式呈現最終的推薦結果,常用的方法有 Logistic Regression。

在本篇文章中,我們將以 Candidate Generation 階段常用的方法之一:Collaborative Filtering 協同過濾演算法為例,利用 Apache Spark 的 ALS (Alternating Least Squares) 模型建立一個 GitHub repositories 的推薦系統,以用戶對 repo 的打星紀錄作為訓練數據,推薦出用戶可能會感興趣的其他 repo 作為候選物品集。

完整的程式碼可以在 https://github.com/vinta/albedo 找到。

系列文章:

Submit the Application

因為需要使用 JDBC 讀取 MySQL 資料庫,必須安裝 MySQL driver,可以透過 --packages "mysql:mysql-connector-java:5.1.41" 參數在 cluster 的每一台機器上安裝需要的 Java packages。

$ spark-submit \
--packages "com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41" \
--master spark://YOUR_SPARK_MASTER:7077 \
--py-files deps.zip \
train_als.py -u vinta

ref:
https://spark.apache.org/docs/latest/submitting-applications.html

Load Data

讀取來自 MySQL 資料庫的數據。你可以使用 predicates 參數來指定 WHERE 條件,雖然嚴格來說這個參數是用來控制 partition 數量的,一個條件就是一個 partition。

假設 app_repostarring 的欄位如下:

CREATE TABLE `app_repostarring` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `from_user_id` int(11) NOT NULL,
  `from_username` varchar(39) NOT NULL,
  `repo_owner_id` int(11) NOT NULL,
  `repo_owner_username` varchar(39) NOT NULL,
  `repo_owner_type` varchar(16) NOT NULL,
  `repo_id` int(11) NOT NULL,
  `repo_name` varchar(100) NOT NULL,
  `repo_full_name` varchar(140) NOT NULL,
  `repo_description` varchar(191) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL,
  `repo_language` varchar(32) NOT NULL,
  `repo_created_at` datetime(6) NOT NULL,
  `repo_updated_at` datetime(6) NOT NULL,
  `starred_at` datetime(6) NOT NULL,
  `stargazers_count` int(11) NOT NULL,
  `forks_count` int(11) NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `from_user_id_repo_id` (`full_name`, `repo_id`)
);
def loadRawData():
    url = 'jdbc:mysql://127.0.0.1:3306/albedo?user=root&password=123&verifyServerCertificate=false&useSSL=false'
    properties = {'driver': 'com.mysql.jdbc.Driver'}
    rawDF = spark.read.jdbc(url, table='app_repostarring', properties=properties)
    return rawDF

rawDF = loadRawData()

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=jdbc#pyspark.sql.DataFrameReader.jdbc
http://www.gatorsmile.io/numpartitionsinjdbc/

Preprocess Data

Format Data

把 raw data 整理成 user,item,rating,starred_at 這樣的格式。starred_at 只有評價 model 時有用來排序,訓練 model 時並沒有用到,因為 Spark 的 ALS 沒辦法輕易地整合 side information。

from pyspark.ml import Transformer

class RatingBuilder(Transformer):

    def _transform(self, rawDF):
        ratingDF = rawDF \
            .selectExpr('from_user_id AS user', 'repo_id AS item', '1 AS rating', 'starred_at') \
            .orderBy('user', F.col('starred_at').desc())
        return ratingDF

ratingBuilder = RatingBuilder()
ratingDF = ratingBuilder.transform(rawDF)
ratingDF.cache()

ref:
http://blog.ethanrosenthal.com/2016/11/07/implicit-mf-part-2/

Inspect Data

import pyspark.sql.functions as F

ratingDF.rdd.getNumPartitions()
# 200

ratingDF.agg(F.count('rating'), F.countDistinct('user'), F.countDistinct('item')).show()
# +-------------+--------------------+--------------------+
# |count(rating)|count(DISTINCT user)|count(DISTINCT item)|
# +-------------+--------------------+--------------------+
# |      3121629|               10483|              551216|
# +-------------+--------------------+--------------------+

stargazersCountDF = ratingDF \
    .groupBy('item') \
    .agg(F.count('user').alias('stargazers_count')) \
    .orderBy('stargazers_count', ascending=False)
stargazersCountDF.show(10)
# +--------+----------------+
# |    item|stargazers_count|
# +--------+----------------+
# | 2126244|            2211|
# |10270250|            1683|
# |  943149|            1605|
# |  291137|            1567|
# |13491895|            1526|
# | 9384267|            1480|
# | 3544424|            1468|
# | 7691631|            1441|
# |29028775|            1427|
# | 1334369|            1399|
# +--------+----------------+

starredCountDF = ratingDF \
    .groupBy('user') \
    .agg(F.count('item').alias('starred_count')) \
    .orderBy('starred_count', ascending=False)
starredCountDF.show(10)
# +-------+-------------+
# |   user|starred_count|
# +-------+-------------+
# |3947125|         8947|
# |5527642|         7978|
# | 446613|         7860|
# | 627410|         7800|
# |  13998|         6334|
# |2467194|         6327|
# |  63402|         6034|
# |2005841|         6024|
# |5073946|         5980|
# |   2296|         5862|
# +-------+-------------+

Clean Data

你可以過濾掉那些太少 user 打星的 item 和打星了太少 item 的 user,提昇矩陣的稠密度。這個現象也正好是 Cold Start 的問題,你就是沒有足夠多的關於這些 item 和 user 的數據(可以考慮使用 content-based 的推薦方式)。除此之外,如果你的推薦系統所推薦的 item 只有非常少人打星,即便你完美地挖掘了長尾效應,這樣的推薦結果給用戶的「第一印象」可能也不會太好(這可能決定了他要不要繼續使用這個系統或是他要不要真的去嘗試那個你推薦給他的東西)。

你也可以選擇要不要過濾掉那些超多人打星的 item 和打星了超多 item 的 user。如果某些 item 有超過八、九成的 user 都打星了,對於這麼熱門的 item,可能也沒有推薦的必要了,因為其他 user 早晚也會自己發現的;如果有少數的 user 幾乎打星了一半以上的 item,這些 user 可能是屬於某種 web crawler 的用途或是這些 user 就是那種看到什麼就打星什麼的人,無論是哪一種,他們可能都不是你想要 modeling 的對象,可以考慮從 dataset 中拿掉。

實務上,如果你有關於 user 或 item 的黑名單,例如一些 SPAM 帳號或 NSFW 的內容等,也可以在這個步驟把它們過濾掉。

from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import Param
import pyspark.sql.functions as F

class DataCleaner(Transformer):

    @keyword_only
    def __init__(self, minItemStargazersCount=None, maxItemStargazersCount=None, minUserStarredCount=None, maxUserStarredCount=None):
        super(DataCleaner, self).__init__()
        self.minItemStargazersCount = Param(self, 'minItemStargazersCount', '移除 stargazer 數低於這個數字的 item')
        self.maxItemStargazersCount = Param(self, 'maxItemStargazersCount', '移除 stargazer 數超過這個數字的 item')
        self.minUserStarredCount = Param(self, 'minUserStarredCount', '移除 starred repo 數低於這個數字的 user')
        self.maxUserStarredCount = Param(self, 'maxUserStarredCount', '移除 starred repo 數超過這個數字的 user')
        self._setDefault(minItemStargazersCount=1, maxItemStargazersCount=50000, minUserStarredCount=1, maxUserStarredCount=50000)
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, minItemStargazersCount=None, maxItemStargazersCount=None, minUserStarredCount=None, maxUserStarredCount=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def setMinItemStargazersCount(self, value):
        self._paramMap[self.minItemStargazersCount] = value
        return self

    def getMinItemStargazersCount(self):
        return self.getOrDefault(self.minItemStargazersCount)

    def setMaxItemStargazersCount(self, value):
        self._paramMap[self.maxItemStargazersCount] = value
        return self

    def getMaxItemStargazersCount(self):
        return self.getOrDefault(self.maxItemStargazersCount)

    def setMinUserStarredCount(self, value):
        self._paramMap[self.minUserStarredCount] = value
        return self

    def getMinUserStarredCount(self):
        return self.getOrDefault(self.minUserStarredCount)

    def setMaxUserStarredCount(self, value):
        self._paramMap[self.maxUserStarredCount] = value
        return self

    def getMaxUserStarredCount(self):
        return self.getOrDefault(self.maxUserStarredCount)

    def _transform(self, ratingDF):
        minItemStargazersCount = self.getMinItemStargazersCount()
        maxItemStargazersCount = self.getMaxItemStargazersCount()
        minUserStarredCount = self.getMinUserStarredCount()
        maxUserStarredCount = self.getMaxUserStarredCount()

        toKeepItemsDF = ratingDF \
            .groupBy('item') \
            .agg(F.count('user').alias('stargazers_count')) \
            .where('stargazers_count >= {0} AND stargazers_count <= {1}'.format(minItemStargazersCount, maxItemStargazersCount)) \
            .orderBy('stargazers_count', ascending=False) \
            .select('item', 'stargazers_count')
        temp1DF = ratingDF.join(toKeepItemsDF, 'item', 'inner')

        toKeepUsersDF = temp1DF \
            .groupBy('user') \
            .agg(F.count('item').alias('starred_count')) \
            .where('starred_count >= {0} AND starred_count <= {1}'.format(minUserStarredCount, maxUserStarredCount)) \
            .orderBy('starred_count', ascending=False) \
            .select('user', 'starred_count')
        temp2DF = temp1DF.join(toKeepUsersDF, 'user', 'inner')

        cleanDF = temp2DF.select('user', 'item', 'rating', 'starred_at')
        return cleanDF

dataCleaner = DataCleaner(
    minItemStargazersCount=2,
    maxItemStargazersCount=4000,
    minUserStarredCount=2,
    maxUserStarredCount=5000
)
cleanDF = dataCleaner.transform(ratingDF)

cleanDF.agg(F.count('rating'), F.countDistinct('user'), F.countDistinct('item')).show()
# +-------------+--------------------+--------------------+
# |count(rating)|count(DISTINCT user)|count(DISTINCT item)|
# +-------------+--------------------+--------------------+
# |      2761118|               10472|              245626|
# +-------------+--------------------+--------------------+

Generate Negative Samples

對 implicit feedback 的 ALS 來說,手動加入負樣本(Rui = 0 的樣本)是沒有意義的,因為 missing value / non-observed value 對該演算法來說本來就是 0,表示用戶確實沒有對該物品做出行為,也就是 Pui = 0 沒有偏好,所以 Cui = 1 + alpha x 0 置信度也會比其他正樣本低。不過因為 Spark ML 的 ALS 只會計算 Rui > 0 的項目,所以即便你手動加入了 Rui = 0 或 Rui = -1 的負樣本,對整個模型其實沒有影響。

雖然沒有負樣本你就不能算 area under ROC curve 或是 area under Precision-Recall curve 等 binary classifier 用的指標,不過你可以改用 Learning to rank 的評估方式,例如 NDCG 或 Mean Average Precision 等。但是 ALS 的 loss function 也沒辦法直接優化 NDCG 這樣的指標就是了。

ref:
https://vinta.ws/code/generate-negative-samples-for-recommender-system.html

Split Data

因為 Matrix Factorization 需要考慮每個 user-item pair,如果你餵給 model 它沒見過的資料,它就沒辦法進行推薦(冷啟動問題)。只要 user 或 item 其中之一不存在於 dataset 裡,ALS model 所輸出的 prediction 值就會是 NaN。所以應該盡量保持每個 user 和 item 都出現在 training set 和 testing set 裡,例如隨機挑出每個 user 的任意 n 個或 n 比例的評分作為 test set,剩下的評分當作 training set(俗稱 leave-n-out)。如果使用 Machine Learning 中常見的 holdout 方式,隨機地把所有 data point 分散到 training set 和 test set(例如 df.randomSplit([0.7, 0.3])),會有很高的機率造成部分 user 或 item 只出現在其中一組 dataset 裡。

ref:
https://jessesw.com/Rec-System/
http://blog.ethanrosenthal.com/2016/10/19/implicit-mf-part-1/

從 LibRec 的文件上也可以發現還有許多拆分數據的方式,例如:

  • 基于 Ratio 的分类方法为通过给定的比例来将数据分为两部分。这个分类过程可以在所有数据中进行随机分类,也可以在用户或者物品维度上进行分类。当有时间的特征时,可以根据时间顺序留出最后一定比例的数据来进行测试。
  • LooCV 的分割方法为 leave-one-user/item/rating-out,也就是随机选取每个 user 的任意一个 item 或者每个 item 的任意一个 user 作为测试数据,余下的数据来作为训练数据。在实现中实现了基于 User 和基于 Item 的多种分类方式。
  • GivenN 分割方法是指为每个用户留出指定数目 N 的数据来作为测试用例,余下的样本作为训练数据。
  • KCV 即 K 折交叉验证。将数据分割为 K 份,在每次执行时选择其中一份作为测试数据,余下的数据作为训练数据,共执行 K 次。综合 K 次的训练结果来对推荐算法的性能进行评估。

ref:
https://www.librec.net/dokuwiki/doku.php?id=DataModel_zh#splitter

這裡我們用 sampleBy() 簡單地寫了一個根據 user 來隨機劃分 item 到 training set 和 test set 的方法。

def randomSplitByUser(df, weights, seed=None):
    trainingRation = weights[0]
    fractions = {row['user']: trainingRation for row in df.select('user').distinct().collect()}
    training = df.sampleBy('user', fractions, seed)
    testRDD = df.rdd.subtract(training.rdd)
    test = spark.createDataFrame(testRDD, df.schema)
    return training, test

training, test = randomSplitByUser(ratingDF, weights=[0.7, 0.3])

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.sampleBy

Train the Model

from pyspark.ml.recommendation import ALS

als = ALS(implicitPrefs=True, seed=42) \
    .setRank(50) \
    .setMaxIter(22) \
    .setRegParam(0.5) \
    .setAlpha(40)

alsModel = als.fit(training)

# 這些就是訓練出來的 user 和 item 的 Latent Factors
alsModel.userFactors.show()
alsModel.itemFactors.show()

ref:
https://spark.apache.org/docs/latest/ml-collaborative-filtering.html
https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS

Predict Preferences

from pyspark.ml import Transformer

predictedDF = alsModel.transform(testing)

class PredictionProcessor(Transformer):

    def _transform(self, predictedDF):
        nonNullDF = predictedDF.dropna(subset=['prediction', ])
        predictionDF = nonNullDF.withColumn('prediction', nonNullDF['prediction'].cast('double'))
        return predictionDF

# 刪掉那些 NaN 的數據
predictionProcessor = PredictionProcessor()
predictionDF = predictionProcessor.transform(predictedDF)

Evaluate the Model

因為 Spark ML 沒有提供給 DataFrame 用的 ranking evaluator,我們只好自己寫一個,但是內部還是使用 Spark MLlib 的 RankingMetrics。不過這個只是 offline 的評估方式而已,等到要實際上線的時候可能還需要做 A/B testing。

from pyspark import keyword_only
from pyspark.ml.evaluation import Evaluator
from pyspark.ml.param.shared import Param
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
import pyspark.sql.functions as F

class RankingEvaluator(Evaluator):

    @keyword_only
    def __init__(self, k=None):
        super(RankingEvaluator, self).__init__()
        self.k = Param(self, 'k', 'Top K')
        self._setDefault(k=30)
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, k=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def isLargerBetter(self):
        return True

    def setK(self, value):
        self._paramMap[self.k] = value
        return self

    def getK(self):
        return self.getOrDefault(self.k)

    def _evaluate(self, predictedDF):
        k = self.getK()

        predictedDF.show()

        windowSpec = Window.partitionBy('user').orderBy(col('prediction').desc())
        perUserPredictedItemsDF = predictedDF \
            .select('user', 'item', 'prediction', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        windowSpec = Window.partitionBy('user').orderBy(col('starred_at').desc())
        perUserActualItemsDF = predictedDF \
            .select('user', 'item', 'starred_at', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        perUserItemsRDD = perUserPredictedItemsDF.join(F.broadcast(perUserActualItemsDF), 'user', 'inner') \
            .rdd \
            .map(lambda row: (row[1], row[2]))

        if perUserItemsRDD.isEmpty():
            return 0.0

        rankingMetrics = RankingMetrics(perUserItemsRDD)
        metric = rankingMetrics.ndcgAt(k)
        return metric

k = 30
rankingEvaluator = RankingEvaluator(k=k)
ndcg = rankingEvaluator.evaluate(predictionDF)
print('NDCG', ndcg)

ref:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html

Recommend Items

實際感受一下推薦系統的效果如何。這裡是直接把結果 print 出來,而沒有把推薦結果儲存到資料庫。不過通常不會直接就把推薦系統輸出的東西展示給用戶,會先經過一些過濾、排序和產生推薦理由等等的步驟,或是加入一些人為的規則,比如說強制插入廣告、最近主打的商品或是過濾掉那些很多人點擊但是其實質量並不怎麼樣的東西。當然也有可能會把這個推薦系統的輸出作為其他機器學習 model 的輸入。

ref:
https://www.zhihu.com/question/28247353

def recommendItems(rawDF, alsModel, username, topN=30, excludeKnownItems=False):
    userID = rawDF \
        .where('from_username = "{0}"'.format(username)) \
        .select('from_user_id') \
        .take(1)[0]['from_user_id']

    userItemsDF = alsModel \
        .itemFactors. \
        selectExpr('{0} AS user'.format(userID), 'id AS item')
    if excludeKnownItems:
        userKnownItemsDF = rawDF \
            .where('from_user_id = {0}'.format(userID)) \
            .selectExpr('repo_id AS item')
        userItemsDF = userItemsDF.join(userKnownItemsDF, 'item', 'left_anti')

    userPredictedDF = alsModel \
        .transform(userItemsDF) \
        .select('item', 'prediction') \
        .orderBy('prediction', ascending=False) \
        .limit(topN)

    repoDF = rawDF \
        .groupBy('repo_id', 'repo_full_name', 'repo_language') \
        .agg(F.max('stargazers_count').alias('stargazers_count'))

    recommendedItemsDF = userPredictedDF \
        .join(repoDF, userPredictedDF['item'] == repoDF['repo_id'], 'inner') \
        .select('prediction', 'repo_full_name', 'repo_language', 'stargazers_count') \
        .orderBy('prediction', ascending=False)

    return recommendedItemsDF

k = 30
username = 'vinta'
recommendedItemsDF = recommendItems(rawDF, alsModel, username, topN=k, excludeKnownItems=False)
for item in recommendedItemsDF.collect():
    repoName = item['repo_full_name']
    repoUrl = 'https://github.com/{0}'.format(repoName)
    print(repoUrl, item['prediction'], item['repo_language'], item['stargazers_count'])

ref:
https://github.com/vinta/albedo/blob/master/src/main/python/train_als.ipynb

Cross-validate Models

使用 Spark ML 的 pipeline 來做 cross-validation,選出最適合的 hyperparameters 組合。

  • rank: The number of latent factors in the model, or equivalently, the number of columns k in the user-feature and product-feature matrices.
  • regParam: A standard overfitting parameter, also usually called lambda. Higher values resist overfitting, but values that are too high hurt the factorization’s accuracy.
  • alpha: Controls the relative weight of observed versus unobserved user-product interactions in the factorization.
  • maxIter: The number of iterations that the factorization runs. More iterations take more time but may produce a better factorization.
dataCleaner = DataCleaner()

als = ALS(implicitPrefs=True, seed=42)

predictionProcessor = PredictionProcessor()

pipeline = Pipeline(stages=[
    dataCleaner,
    als,
    predictionProcessor,
])

paramGrid = ParamGridBuilder() \
    .addGrid(dataCleaner.minItemStargazersCount, [1, 10, 100]) \
    .addGrid(dataCleaner.maxItemStargazersCount, [4000, ]) \
    .addGrid(dataCleaner.minUserStarredCount, [1, 10, 100]) \
    .addGrid(dataCleaner.maxUserStarredCount, [1000, 4000, ]) \
    .addGrid(als.rank, [50, 100]) \
    .addGrid(als.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(als.alpha, [0.01, 0.89, 1, 40, ]) \
    .addGrid(als.maxIter, [22, ]) \
    .build()

rankingEvaluator = RankingEvaluator(k=30)

cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=rankingEvaluator,
                    numFolds=2)

cvModel = cv.fit(ratingDF)

def printCrossValidationParameters(cvModel):
    metric_params_pairs = list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))
    metric_params_pairs.sort(key=lambda x: x[0], reverse=True)
    for pair in metric_params_pairs:
        metric, params = pair
        print('metric', metric)
        for k, v in params.items():
            print(k.name, v)
        print('')

printCrossValidationParameters(cvModel)

ref:
https://spark.apache.org/docs/latest/ml-pipeline.html

Spark ML cookbook (Python)

Spark ML cookbook (Python)

Calculate percentage of sparsity of a user-item rating DataFrame

result = ratingDF.agg(F.count('rating'), F.countDistinct('user'), F.countDistinct('item')).collect()[0]
totalUserCount = result['count(DISTINCT user)']
totalItemCount = result['count(DISTINCT item)']
zonZeroRatingCount = result['count(rating)']

density = (zonZeroRatingCount / (totalUserCount * totalItemCount)) * 100
sparsity = 100 - density

Recommend items for single user

topN = 30
userID = 123

userItemsDF = alsModel \
    .itemFactors. \
    selectExpr('{0} AS user'.format(userID), 'id AS item')
userPredictedDF = alsModel \
    .transform(userItemsDF) \
    .select('item', 'prediction') \
    .orderBy('prediction', ascending=False) \
    .limit(topN)

ref:
https://www.safaribooksonline.com/library/view/advanced-analytics-with/9781491972946/ch03.html
https://www.safaribooksonline.com/library/view/building-a-recommendation/9781785282584/ch06s04.html
https://www.safaribooksonline.com/library/view/building-recommendation-engines/9781785884856/ch07s05.html

Recommend items for every user (a slightly fast way)

import numpy
from numpy import *

myModel = MatrixFactorizationModel.load(sc, 'BingBong')
m1 = myModel.productFeatures()
m2 = m1.map(lambda (product, feature): feature).collect()
m3 = matrix(m2).transpose()
pf = sc.broadcast(m3)
uf = myModel.userFeatures().coalesce(100)

# get predictions on all user
f1 = uf.map(lambda (userID, features): (userID, squeeze(asarray(matrix(array(features)) * pf.value))))

ref:
https://www.slideshare.net/SparkSummit/26-trillion-app-recomendations-using-100-lines-of-spark-code-ayman-farahat

Evaluate a binary classification

from pyspark.ml.evaluation import BinaryClassificationEvaluator

matrix = [
    (0.5, 1),
    (2.0, 1),
    (0.8, 1),
    (0.2, 0),
    (0.1, 0),
    (0.4, 0),
]
predictions = spark.createDataFrame(matrix, ['prediction', 'label'])
predictions = predictions.withColumn('prediction', predictions['prediction'].cast('double'))

evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
evaluator.evaluate(predictions)

Calculate ranking metrics

from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window
from pyspark.sql.functions import col, expr
import pyspark.sql.functions as F

k = 10

windowSpec = Window.partitionBy('user').orderBy(col('prediction').desc())
perUserPredictedItemsDF = outputDF \
    .select('user', 'item', 'prediction', F.rank().over(windowSpec).alias('rank')) \
    .where('rank <= {0}'.format(k)) \
    .groupBy('user') \
    .agg(expr('collect_list(item) as items'))
perUserPredictedItemsDF.show()
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[36560369, 197450...|
# |   47217|[40693501, 643554...|
# +--------+--------------------+

windowSpec = Window.partitionBy('from_user_id').orderBy(col('starred_at').desc())
perUserActualItemsDF = rawDF \
    .select('from_user_id', 'repo_id', 'starred_at', F.rank().over(windowSpec).alias('rank')) \
    .where('rank <= {0}'.format(k)) \
    .groupBy('from_user_id') \
    .agg(expr('collect_list(repo_id) as items')) \
    .withColumnRenamed('from_user_id', 'user')
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[29122050, 634846...|
# |   59990|[9820191, 8729416...|
# +--------+--------------------+

perUserItemsRDD = perUserPredictedItemsDF.join(perUserActualItemsDF, 'user') \
    .rdd \
    .map(lambda row: (row[1], row[2]))
rankingMetrics = RankingMetrics(perUserItemsRDD)

print(rankingMetrics.meanAveragePrecision)
print(rankingMetrics.precisionAt(k))
print(rankingMetrics.ndcgAt(k))

ref:
https://www.safaribooksonline.com/library/view/spark-the-definitive/9781491912201/ch19.html

Create a custom Transformer

from pyspark.ml import Transformer

class PredictionProcessor(Transformer):

    def _transform(self, predictedDF):
        nonNullDF = predictedDF.dropna(subset=['prediction', ])
        predictionDF = nonNullDF.withColumn('prediction', nonNullDF['prediction'].cast('double'))
        return predictionDF

ref:
http://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml

RankingMetrics

結果介於 0 ~ 1 之間,分數越大越好。

from pyspark.mllib.evaluation import RankingMetrics

predictionAndLabels = sc.parallelize([
    ([1, 2, 3, 4], [1, 2, 3, 4]),
    ([1, ], [1, 10]),
    ([6, 4, 2], [6, 2, 100, 8, 2, 55]),
])
metrics = RankingMetrics(predictionAndLabels)
metrics.ndcgAt(5)

ref:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems

Create a custom Evaluator

from pyspark import keyword_only
from pyspark.ml.evaluation import Evaluator
from pyspark.ml.param.shared import Param
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
import pyspark.sql.functions as F

class RankingEvaluator(Evaluator):

    @keyword_only
    def __init__(self, k=None):
        super(RankingEvaluator, self).__init__()
        self.k = Param(self, 'k', 'Top K')
        self._setDefault(k=30)
        kwargs = self.__init__._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, k=None):
        kwargs = self.setParams._input_kwargs
        return self._set(**kwargs)

    def isLargerBetter(self):
        return True

    def setK(self, value):
        self._paramMap[self.k] = value
        return self

    def getK(self):
        return self.getOrDefault(self.k)

    def _evaluate(self, outputDF):
        k = self.getK()

        windowSpec = Window.partitionBy('user').orderBy(col('prediction').desc())
        perUserPredictedItemsDF = outputDF \
            .select('user', 'item', 'prediction', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        windowSpec = Window.partitionBy('user').orderBy(col('starred_at').desc())
        perUserActualItemsDF = outputDF \
            .select('user', 'item', 'starred_at', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(k)) \
            .groupBy('user') \
            .agg(expr('collect_list(item) as items'))

        perUserItemsRDD = perUserPredictedItemsDF.join(F.broadcast(perUserActualItemsDF), 'user', 'inner') \
            .rdd \
            .map(lambda row: (row[1], row[2]))
        rankingMetrics = RankingMetrics(perUserItemsRDD)
        metric = rankingMetrics.ndcgAt(k)
        return metric

Show best parameters from a CrossValidatorModel

metric_params_pairs = list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))
metric_params_pairs.sort(key=lambda x: x[0], reverse=True)
best_metric_params = metric_params_pairs[0][1]
for pair in metric_params_pairs:
    metric, params = pair
    print('metric', metric)
    for k, v in params.items():
        print(k.name, v)
    print('')
# metric 0.5273636632856705
# rank 50
# regParam 0.01
# maxIter 10
# alpha 1

ref:
https://stackoverflow.com/questions/31749593/how-to-extract-best-parameters-from-a-crossvalidatormodel
https://stackoverflow.com/questions/39529012/pyspark-get-all-parameters-of-models-created-with-paramgridbuilder

Show best parameters from a TrainValidationSplitModel

metric_params_pairs = list(zip(tvModel.validationMetrics, tvModel.getEstimatorParamMaps()))
metric_params_pairs.sort(key=lambda x: x[0], reverse=True)
for pair in metric_params_pairs:
    metric, params = pair
    print('metric', metric)
    for k, v in params.items():
        print(k.name, v)
    print('')
# metric 0.5385481418189484
# rank 50
# regParam 0.1
# maxIter 20
# alpha 1