Setup Spark on macOS

Setup Spark on macOS

Install

First, you need Java 8 JDK.
http://www.oracle.com/technetwork/java/javase/downloads/index.html

$ java -version
java version "1.8.0_131"

Homebrew Version

$ brew update
$ brew install maven apache-spark

Pre-built Version

$ mkdir -p /usr/local/share/apache-spark && \
  cd /usr/local/share/apache-spark && \
  wget https://www.apache.org/dyn/closer.lua/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz && \
  tar -xvzf spark-2.2.0-bin-hadoop2.7.tgz

ref:
http://spark.apache.org/downloads.html

Build Version

This is the recommended way.

$ brew install [email protected]
$ export PATH="/usr/local/opt/[email protected]/bin:$PATH"
$ scala -version
Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL

$ mkdir -p /usr/local/share/apache-spark && \
  cd /usr/local/share/apache-spark && \
  wget https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0.tgz && \
  tar -xvzf spark-2.2.0.tgz && \
  cd spark-2.2.0

$ ./build/mvn -Pnetlib-lgpl -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.3 -DskipTests -T 4C package
# or
$ ./build/mvn -Pnetlib-lgpl -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.3 -DskipTests clean package
$ spark-shell --packages "com.github.fommil.netlib:all:1.1.2"
scala> import com.github.fommil.netlib.BLAS
import com.github.fommil.netlib.BLAS
scala> BLAS.getInstance().getClass().getName()
res1: String = com.github.fommil.netlib.NativeSystemBLAS

ref:
http://spark.apache.org/downloads.html
http://spark.apache.org/docs/latest/building-spark.html
http://spark.apache.org/docs/latest/ml-guide.html#dependencies

Configurations

in .zshrc

if which java > /dev/null; then
  export JAVA_HOME="$(/usr/libexec/java_home -v 1.8)"
  export PATH="$JAVA_HOME/bin:$PATH"
fi

export PATH="/usr/local/opt/[email protected]/bin:$PATH"

# homebrew version
export SPARK_HOME="/usr/local/Cellar/apache-spark/2.2.0/libexec"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"
export PYSPARK_DRIVER_PYTHON="ipython"

# pre-built version
export SPARK_HOME="/usr/local/share/apache-spark/spark-2.2.0-bin-hadoop2.7"
export PATH="$SPARK_HOME/bin:$PATH"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"

# build version
export SPARK_HOME="/usr/local/share/apache-spark/spark-2.2.0"
export PATH="$SPARK_HOME/bin:$PATH"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"

ref:
https://spark.apache.org/docs/latest/programming-guide.html
https://spark.apache.org/docs/latest/configuration.html

$ cd $SPARK_HOME

$ cp conf/spark-defaults.conf.template conf/spark-defaults.conf
spark.driver.memory              4g
spark.executor.memory            4g
spark.jars.packages              com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41
spark.serializer                 org.apache.spark.serializer.KryoSerializer

$ cp conf/spark-env.sh.template conf/spark-env.sh
export PYTHONHASHSEED=42

$ cp conf/log4j.properties.template conf/log4j.properties

ref:
https://spark.apache.org/docs/latest/configuration.html

Commands

Local Mode

$ spark-shell

$ export PYSPARK_DRIVER_PYTHON="jupyter" && \
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip 0.0.0.0" && \
pyspark \
--packages "com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41" \
--driver-memory 4g \
--executor-memory 4g \
--master "local[*]"

$ spark-shell \
--packages "com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41"
--master "local-cluster[3, 1, 4096]"

# Spark Application UI on the driver
$ open http://localhost:4040/

ref:
https://spark.apache.org/docs/latest/programming-guide.html

Standalone mode

There are two deploy modes for Spark Standalone. In client mode, the driver is launched in the same process as the client that submits the application. In cluster mode, however, the driver is launched from one of the Worker.

$ ./sbin/start-master.sh -h localhost
$ ./sbin/start-slave.sh spark://localhost:7077

# Spark Web UI on the cluster manager
$ open http://localhost:8080/

$ pyspark \
--driver-memory 4g \
--executor-memory 4g \
--master spark://localhost:7077

$ spark-submit \
--master spark://localhost:7077 \
examples/src/main/python/pi.py 10

$ spark-submit \
--driver-memory 2g \
--driver-java-options "-XX:ThreadStackSize=81920" \
--total-executor-cores 3 \
--executor-cores 3 \
--executor-memory 12g \
--conf "spark.executor.extraJavaOptions=-XX:ThreadStackSize=81920" \
--master spark://localhost:7077 \
--packages "mysql:mysql-connector-java:5.1.41,com.hankcs:hanlp:portable-1.3.4,edu.stanford.nlp:stanford-corenlp:3.7.0" \
--jars "/Users/vinta/Projects/albedo/spark-data/stanford-corenlp-3.8.0-models.jar" \
--class ws.vinta.albedo.LogisticRegressionRanker \
target/albedo-1.0.0-SNAPSHOT.jar

# Spark Application UI on the driver
$ open http://localhost:4040/

ref:
https://spark.apache.org/docs/latest/spark-standalone.html
https://spark.apache.org/docs/latest/submitting-applications.html
https://spark.apache.org/docs/latest/configuration.html

Recommender System: Matrix Factorization

Recommender System: Matrix Factorization

Singular Value Decomposition (SVD) 奇異值分解

        item_a  item_b  item_c
user_1  2       -       -
user_2  -       1       -
user_3  3       3       -
user_4  -       2       2

對 m x n 的評分矩陣做矩陣分解,矩陣分解常用的方法之一是 SVD 奇異值分解,把評分矩陣分解成三個矩陣 U S V*。

SVD 在推荐系统中的应用
http://yanyiwu.com/work/2012/09/10/SVD-application-in-recsys.html

奇异值分解 (SVD) 原理与在降维中的应用
http://www.cnblogs.com/pinard/p/6251584.html

Alternating Least Squares (ALS) 交替最小二乘法

因為 SVD 要求矩陣是稠密的,推薦系統中的評分矩陣通常是一個很大的稀疏矩陣,所以實務上都會使用 SVD 的變種:Funk-SVD,Funk-SVD 也稱為 Latent Factor Model (LFM) 隱語意模型:把 m x n 的評分矩陣 R 分解成 U I 兩個矩陣,分別是 m x k 的 users 矩陣和 k x n 的 items 矩陣,各自都有 k 維的特徵,計算時只考慮評分不為 0 的項目。

ALS 就是求出 U I 矩陣的一種求解方法。其他的方式還有 Stochastic Gradient Descent (SGD)。

ALS 在 Spark MLlib 中的实现
http://www.csdn.net/article/2015-05-07/2824641

矩阵分解在协同过滤推荐算法中的应用
http://www.cnblogs.com/pinard/p/6351319.html

基于矩阵分解的隐因子模型
http://www.voidcn.com/blog/winone361/article/p-5031282.html

ALS for implicit feedbacks

這種模型也稱為 Weighted Regularized Matrix Factorization (WRMF)。

        item_a  item_b  item_c
user_1  1       -       -
user_2  -       1       -
user_3  1       1       -
user_4  -       1       1

符號說明:

  • Rui = User u's interaction on Item i
  • Xu = User u's vector
  • Yi = Item i's vector
  • Pui = 1 if Rui > 0 else 0
  • Cui = 1 + alpha x Rui

參數說明:

  • iteration 迭代次數
  • rank 隱含特徵數
  • lambda 正則化參數
  • alpha 置信度權重

跟 Explicit ALS 一樣,把 m x n 的隱式反饋矩陣 R 分解成 X Y 兩個矩陣,分別是 m x k 的 user latent factors 矩陣和 k x n 的 item latent factors 矩陣。不同的是,還額外引入了兩個矩陣:m x n 的 P 矩陣,binary preference,Pui 表示用戶是不是對該物品感興趣;m x n 的 C 矩陣,confidence 置信度(或者想成是 strength 強度),Cui 表示有多確定用戶對該物品感興趣。整個演算法的目標仍舊是計算出 users 和 items 矩陣,跟 Explicit ALS 的差別在於 loss function 多加入了 Cui 和 Pui,而且會考慮所有的 user-item pair,包含 missing value / non-observed value(Rui = 0)。

只要用戶對物品有過行為(Rui > 0),我們就假設用戶對該物品有偏好(Pui = 1),用戶對該物品的行為越多(Rui 越大),Cui 就會越大,表示置信度越高,假設越可信;如果用戶對該物品沒有行為(Rui = 0),就假設用戶對該物品沒有偏好(Pui = 0),但是因為 Cui = 1 + alpha x 0,所以這個假設相對來說置信度較低,比較不可信。

Explicit ALS 是要預測用戶對某個物品的評分;Implicit ALS 則是要預測用戶會不會對某個物品感興趣:Pui = Xu x Yi.T,也就是說 Implicit ALS 輸出的 prediction 其實是用 X x Y.T 所重建出來的 P 矩陣。這些 prediction 的值基本上會落在 [0, 1] 之間,但是也可能大於 1 或是小於 0,值越大表示用戶對該物品有越強烈的偏好,越小則反之。

因為 ALS 屬於 collaborative filtering,所以也有 cold start 的問題,無法對新用戶和新物品做出推薦。除此之外,ALS 的缺點是缺乏解釋性而且難以加入 side information(user 和 item 本身的 features,例如性別、曲風、類型之類的)。不過因為 matrix factorization 其實隱含了 clustering 的功能,相對於其他方式來說,具有抗雜訊的能力。

Collaborative Filtering for Implicit Feedback Datasets
http://yifanhu.net/PUB/cf.pdf

A Gentle Introduction to Recommender Systems with Implicit Feedback
https://jessesw.com/Rec-System/

Intro to Implicit Matrix Factorization: Classic ALS
http://blog.ethanrosenthal.com/2016/10/19/implicit-mf-part-1/

Spark ML 算法原理剖析
https://github.com/endymecy/spark-ml-source-analysis