Spark RDD methods (Python / Scala)

Spark RDD methods (Python / Scala)

以下的 methods 有些是所有 RDD 都能用,有些則是 PairRDD 才有的功能。然後因為在不同的 projects 我先後用了 Spark 的 Python API 和 Scala API,所以以下的內容可能會混雜著兩者的範例。Scala API 要特別注意每個 method 接受和回傳的 type 的差異;Python API 就沒有這種限制了,畢竟是動態語言。

ref:
http://spark.apache.org/docs/latest/programming-guide.html#transformations
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions

RDD Methods

map()

函數 signature 為 def map[U](f: (T) ⇒ U): RDD[U]

map() 接受的 function 的輸入參數就是 RDD 的每個元素(從 DataFrame 的角度看,每個 row):func(row),return 一個任意物件(例如一個 int、一個 string 或一個 tuple)。所以 map() 之後會得到一個 row 數相同的 RDD,但是 type 可能會不一樣。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd.map(lambda row: (row[0], row[1])).collect()
# [(1, 1), (1, 2), (1, 3), (1, 6), (2, 6), (3, 1), (3, 5), (4, 1), (4, 4)]
import org.apache.spark.rdd.RDD

val rdd: RDD[(String, Int)] = sc.parallelize(Seq(
  ("Python", 1),
  ("Scala", 3),
  ("Java", 2)
))
val test: RDD[String] = rdd.map({
  case (lang, count) => {
    s"${lang}_${count}"
  }
})
test.collect().foreach(println)
// Python_1
// Scala_3
// Java_2

flatMap()

函數 signature 為 def flatMap[U](f: (T) ⇒ TraversableOnce[U]): RDD[U]

flatMap()map() 很像,接受的 function 的輸入參數也是 RDD 的每個 row:func(row),差別在於 flatMap() 只能回傳一個 Iterable 物件(例如一個 tuple 或 list,但是可以是空的),而且 flatMap() 會把 return 的結果攤平。所以 flatMap() 之後的 count() 可能會比原本的 RDD 大或小。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd.flatMap(lambda row: (row[0], row[1])).collect()
# [1, 1, 1, 2, 1, 3, 1, 6, 2, 6, 3, 1, 3, 5, 4, 1, 4, 4]

df.rdd.flatMap(lambda row: (row[2], )).collect()
# [1, 1, 1, 0, 1, 1, 1, 0, 1]

ref:
http://backtobazics.com/big-data/spark/apache-spark-flatmap-example/
http://apachesparkbook.blogspot.tw/2015/05/difference-between-map-flatmap-in-rdd.html
http://stackoverflow.com/questions/42867457/map-each-element-of-a-list-in-spark
http://stackoverflow.com/questions/21096432/list-or-iterator-of-tuples-returned-by-map-pyspark

reduce()

函數 signature 為 def reduce(f: (T, T) ⇒ T): T

reduce() 接受的 function 的輸入參數是 RDD 的兩兩元素:func(element1, element2),return 單一物件,而且是跟輸入參數同樣 type 的,最後整個 reduce() 會得到一個單一的值。

array = [
    1,
    5,
    4,
    2,
    3,
]
rdd = sc.parallelize(array)

rdd.reduce(lambda element1, element2: element1 + element2)
# 15

def max(element1, element2):
    return element1 if element1 > element2 else element2

rdd.reduce(max)
# 5

treeReduce()

函數 signature 為 def treeReduce(f: (T, T) ⇒ T, depth: Int = 2): T

普通的 reduce() 會直接把每一個 partition 的 reduce 結果送回 driver machine 做最後運算,當 partition 很多和每個 partition 的資料量很大的時候可能會是一個瓶頸。這時候你可以改用 treeReduce(),不過使用不當的話反而會有反效果。類似的關係還有 aggregate()treeAggregate()

ref:
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/treereduce_and_treeaggregate_demystified.html

aggregate()

函數 signature 為 def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U

import org.apache.spark.rdd.RDD

case class WikipediaArticle(title: String, text: String) {
  def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}

val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
  WikipediaArticle("a1", "abc Scala xyz"),
  WikipediaArticle("a2", "abc Python xyz"),
  WikipediaArticle("a3", "abc Scala xyz"),
  WikipediaArticle("a4", "abc Scala xyz")
))

def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int = {
  val accumulateOp = (total: Int, a: WikipediaArticle) => total + 1
  val combineOp = (count1: Int, count2: Int) => count1 + count2
  rdd.filter(_.mentionsLanguage(lang)).aggregate(0)(accumulateOp, combineOp)
}

occurrencesOfLang("Scala", wikiRdd)
// 3

ref:
https://stackoverflow.com/questions/28240706/explain-the-aggregate-functionality-in-spark

PairRDD Methods

PairRDD 就是包含 key/value pair 的 RDD,長這樣:RDD[(K, V)]KV 除了可能是基本的 type 之外,也可能是其他 object 或 collection。PairRDDFunctions 回傳的也都是 RDD[(K, V)]

ref:
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

mapValues()

函數 signature 為 def mapValues[U](f: (V) ⇒ U): RDD[(K, U)]

mapValues() 的 function 接受的唯一一個參數就是 PairRDDV,它只處理 values。

val rdd = sc.parallelize(Seq(
  ("Python", 1),
  ("Scala", 3),
  ("Java", 2)
))

rdd
  .mapValues((count: Int) => {
    count * 10
  })
  .collect()
// Array((Python,10), (Scala,30), (Java,20))

ref:
http://apachesparkbook.blogspot.tw/2015/12/mapvalues-example.html

groupByKey()

函數 signature 為 def groupByKey(): RDD[(K, Iterable[V])]

以下這些情況應該避免使用 groupByKey()

  • If operataion is expressed using groupByKey followed by associative and commutative reducing operation on values (sum, count, max / min) it should be replaced by reduceByKey.
  • If operation can be expressed using a comination of local sequence operation and merge operation (online variance / mean, top-n observations) it should be expressed with combineByKey or aggregateByKey.
  • If final goal is to traverse values in a specific order (groupByKey followed by sorting values followed by iteration) it can be typically rewritten as repartitionAndSortWithinPartitions with custom partitioner and ordering followed by mapPartitions.
case class WikipediaArticle(title: String, text: String) {
  def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}

val langs = List(
  "Java", "PHP", "Python", "Scala"
)

val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
  WikipediaArticle("a1", "a1 Scala"),
  WikipediaArticle("a2", "a2 Python"),
  WikipediaArticle("a3", "a3 Scala"),
  WikipediaArticle("a4", "a4 Java"),
  WikipediaArticle("a5", "a5 Java"),
  WikipediaArticle("a6", "a6 Scala")
))

def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = {
  rdd
    .flatMap((article: WikipediaArticle) => {
      langs
        .filter((lang: String) => {
          article.mentionsLanguage(lang)
        })
        .map((lang: String) => {
          (lang, article)
        })
    })
    .groupByKey()
}

def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd)
// Array(
//   (Java,CompactBuffer(WikipediaArticle(a4,a4 Java), WikipediaArticle(a5,a5 Java))),
//   (Python,CompactBuffer(WikipediaArticle(a2,a2 Python))),
//   (Scala,CompactBuffer(WikipediaArticle(a1,a1 Scala), WikipediaArticle(a3,a3 Scala), WikipediaArticle(a6,a6 Scala)))
// )

def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] = {
  index
    .map({
      case (lang, articles) => {
        (lang, articles.size)
      }
    })
    .sortBy(-_._2)
    .collect()
    .toList
}

rankLangsUsingIndex(index)
// List((Scala,3),(Java,2),(Python,1),(PHP,0))

Spark Best Practices
https://github.com/beeva/beeva-best-practices/blob/master/big_data/spark/README.md

RDD actions and Transformations by Example
https://github.com/awesome-spark/spark-gotchas/blob/master/04_rdd_actions_and_transformations_by_example.md

Avoid groupByKey when performing an associative reductive operation
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/avoid_groupbykey_when_performing_an_associative_re.html

reduceByKey()

函數 signature 為 def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

reduceByKey() 接受的兩個參數的類型是由 PairRDDV 決定的。做到同樣的功能,reduceByKey() 的執行效率比 groupByKey() + reduce() 好很多。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd \
    .map(lambda row: (row[0], row[1])) \
    .reduceByKey(lambda x, y: x + y) \
    .collect()
# map() => [(1, 1), (1, 2), (1, 3), (1, 6), (2, 6), (3, 1), (3, 5), (4, 1), (4, 4)]
# reduceByKey() => [(1, 12), (2, 6), (3, 6), (4, 5)]
matrix = [
    (1, [[100, 1.0], [200, 2.0]]),
    (2, [[300, 3.0]]),
    (2, [[400, 4.0], [600, 6.0]]),
    (2, [[500, 5.0]]),
]
df = spark.createDataFrame(matrix, ['user', 'recommendations'])

def merge_recommendations(recommendations1, recommendations2):
    return recommendations1 + recommendations2

def slice_recommendations(row, candidate_k):
    user, recommendations = row
    sliced_recommendations = sorted(recommendations, key=lambda recommendation: recommendation.rating, reverse=True)[:candidate_k + 5]
    return (user, sliced_recommendations)

full_rdd = df \
    .rdd \
    .reduceByKey(lambda x, y: merge_recommendations(x, y)) \
    .map(lambda row: slice_recommendations(row, candidate_k))
final_df = spark.createDataFrame(full_rdd, ['user', 'recommendations'])
val langs = List(
  "Java", "PHP", "Python", "Scala"
)

val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
  WikipediaArticle("a1", "a1 Scala"),
  WikipediaArticle("a2", "a2 Python"),
  WikipediaArticle("a3", "a3 Scala"),
  WikipediaArticle("a4", "a4 Java"),
  WikipediaArticle("a5", "a5 Java"),
  WikipediaArticle("a6", "a6 Scala")
))

def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = {
  rdd
    .flatMap((article: WikipediaArticle) => {
      langs
        .filter((lang: String) => {
          article.mentionsLanguage(lang)
        })
        .map((lang: String) => {
          (lang, 1)
        })
    })
    .reduceByKey((count1: Int, count2: Int) => {
      count1 + count2
    })
    .collect()
    .toList
    .sortWith(_._2 > _._2)
}

rankLangsReduceByKey(langs, wikiRdd)
// List((Scala,3),(Java,2),(Python,1),(PHP,0))

Avoid reduceByKey when the input and output value types are different
http://backtobazics.com/big-data/spark/apache-spark-reducebykey-example/

Reduce a key-value pair into a key-list pair
http://stackoverflow.com/questions/27002161/reduce-a-key-value-pair-into-a-key-list-pair-with-apache-spark

foldByKey()

函數 signature 為 def foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]

foldByKey() 基本上就是可以手動指定 zero value 的 reduceByKey()

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd \
    .map(lambda row: (row[0], [row[1], ])) \
    .foldByKey(list(), add) \
    .collect()
# [(4, [1, 4]), (1, [1, 2, 3, 6]), (2, [6]), (3, [1, 5])]

aggregateByKey()

函數 signature 為 def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U): RDD[(K, U)]

適合用來取代 .map(lambda row: (row['user'], [row['item'], ])).reduceByKey(lambda v1, v2: v1 + v2) 這樣的套路。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 4, 1),
    (1, 5, 1),
    (2, 2, 1),
    (2, 2, 1),
    (2, 3, 1),
    (3, 5, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

def seqFunc(item_set, item):
    item_set.add(item)
    return item_set

def combFunc(item_set1, item_set2):
    return item_set1.union(item_set2)

df.select('user', 'item').rdd \
    .aggregateByKey(set(), seqFunc, combFunc) \
    .collect()
# [(1, {1, 2, 4, 5}), (2, {2, 3}), (3, {5})]

ref:
http://codingjunkie.net/spark-agr-by-key/
https://stackoverflow.com/questions/31081563/apache-spark-what-is-the-equivalent-implementation-of-rdd-groupbykey-using-rd
https://stackoverflow.com/questions/24804619/how-does-spark-aggregate-function-aggregatebykey-work

combineByKey()

函數 signature 為 def combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

用來把 RDD[(K, V)] 轉變成 RDD[(K, C)]C 可以是任意的 type。

combineByKey() 接受三個 functions:

  • createCombiner, which turns a V into a C (e.g., creates a one-element list)
  • mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
  • mergeCombiners, to combine two C’s into a single one.
import pyspark.sql.functions as F

matrix = [
    ('chinese', 80),
    ('math', 60),
    ('english', 100),
    ('chinese', 90),
    ('math', 100),
    ('math', 10),
    ('english', 70),
    ('english', 20),
    ('history', 30),
]
df = spark.createDataFrame(matrix, ['subject', 'score'])

def createCombiner(score):
    return (score, 1)

def mergeValue(accumulate, score):
    total_score = accumulate[0] + score
    total_count = accumulate[1] + 1
    return (total_score, total_count)

def mergeCombiners(accumulate1, accumulate2):
    total_score = accumulate1[0] + accumulate2[0]
    total_count = accumulate1[1] + accumulate2[1]
    return (total_score, total_count)

df.rdd.combineByKey(createCombiner, mergeValue, mergeCombiners).collect()
# you could calculate the average score of every subject
# [('chinese', (170, 2)),
 # ('history', (30, 1)),
 # ('math', (170, 3)),
 # ('english', (190, 3))]

ref:
https://zhangyi.gitbooks.io/spark-in-action/content/chapter2/rdd.html

cogroup()

如果你要 join 兩個已經被 groupBy 的 RDD,可以使用 cogroup()。避免使用 flatMap + join + groupBy 這樣的套路。

Setup Spark on macOS

Setup Spark on macOS

Install

First, you need Java 8 JDK.
http://www.oracle.com/technetwork/java/javase/downloads/index.html

$ java -version
java version "1.8.0_131"

Homebrew Version

$ brew update
$ brew install maven apache-spark

Pre-built Version

$ mkdir -p /usr/local/share/apache-spark && \
  cd /usr/local/share/apache-spark && \
  wget https://www.apache.org/dyn/closer.lua/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz && \
  tar -xvzf spark-2.2.0-bin-hadoop2.7.tgz

ref:
http://spark.apache.org/downloads.html

Build Version

This is the recommended way.

$ brew install [email protected]
$ export PATH="/usr/local/opt/[email protected]/bin:$PATH"
$ scala -version
Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL

$ mkdir -p /usr/local/share/apache-spark && \
  cd /usr/local/share/apache-spark && \
  wget https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0.tgz && \
  tar -xvzf spark-2.2.0.tgz && \
  cd spark-2.2.0

$ ./build/mvn -Pnetlib-lgpl -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.3 -DskipTests -T 4C package
# or
$ ./build/mvn -Pnetlib-lgpl -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.3 -DskipTests clean package
$ spark-shell --packages "com.github.fommil.netlib:all:1.1.2"
scala> import com.github.fommil.netlib.BLAS
import com.github.fommil.netlib.BLAS
scala> BLAS.getInstance().getClass().getName()
res1: String = com.github.fommil.netlib.NativeSystemBLAS

ref:
http://spark.apache.org/downloads.html
http://spark.apache.org/docs/latest/building-spark.html
http://spark.apache.org/docs/latest/ml-guide.html#dependencies

Configurations

in .zshrc

if which java > /dev/null; then
  export JAVA_HOME="$(/usr/libexec/java_home -v 1.8)"
  export PATH="$JAVA_HOME/bin:$PATH"
fi

export PATH="/usr/local/opt/[email protected]/bin:$PATH"

# homebrew version
export SPARK_HOME="/usr/local/Cellar/apache-spark/2.2.0/libexec"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"
export PYSPARK_DRIVER_PYTHON="ipython"

# pre-built version
export SPARK_HOME="/usr/local/share/apache-spark/spark-2.2.0-bin-hadoop2.7"
export PATH="$SPARK_HOME/bin:$PATH"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"

# build version
export SPARK_HOME="/usr/local/share/apache-spark/spark-2.2.0"
export PATH="$SPARK_HOME/bin:$PATH"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"

ref:
https://spark.apache.org/docs/latest/programming-guide.html
https://spark.apache.org/docs/latest/configuration.html

$ cd $SPARK_HOME

$ cp conf/spark-defaults.conf.template conf/spark-defaults.conf
spark.driver.memory              4g
spark.executor.memory            4g
spark.jars.packages              com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41
spark.serializer                 org.apache.spark.serializer.KryoSerializer

$ cp conf/spark-env.sh.template conf/spark-env.sh
export PYTHONHASHSEED=42

$ cp conf/log4j.properties.template conf/log4j.properties

ref:
https://spark.apache.org/docs/latest/configuration.html

Commands

Local Mode

$ spark-shell

$ export PYSPARK_DRIVER_PYTHON="jupyter" && \
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip 0.0.0.0" && \
pyspark \
--packages "com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41" \
--driver-memory 4g \
--executor-memory 4g \
--master "local[*]"

$ spark-shell \
--packages "com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41"
--master "local-cluster[3, 1, 4096]"

# Spark Application UI on the driver
$ open http://localhost:4040/

ref:
https://spark.apache.org/docs/latest/programming-guide.html

Standalone mode

There are two deploy modes for Spark Standalone. In client mode, the driver is launched in the same process as the client that submits the application. In cluster mode, however, the driver is launched from one of the Worker.

$ ./sbin/start-master.sh -h localhost
$ ./sbin/start-slave.sh spark://localhost:7077

# Spark Web UI on the cluster manager
$ open http://localhost:8080/

$ pyspark \
--driver-memory 4g \
--executor-memory 4g \
--master spark://localhost:7077

$ spark-submit \
--master spark://localhost:7077 \
examples/src/main/python/pi.py 10

$ spark-submit \
--driver-memory 2g \
--driver-java-options "-XX:ThreadStackSize=81920" \
--total-executor-cores 3 \
--executor-cores 3 \
--executor-memory 12g \
--conf "spark.executor.extraJavaOptions=-XX:ThreadStackSize=81920" \
--master spark://localhost:7077 \
--packages "mysql:mysql-connector-java:5.1.41,com.hankcs:hanlp:portable-1.3.4,edu.stanford.nlp:stanford-corenlp:3.7.0" \
--jars "/Users/vinta/Projects/albedo/spark-data/stanford-corenlp-3.8.0-models.jar" \
--class ws.vinta.albedo.LogisticRegressionRanker \
target/albedo-1.0.0-SNAPSHOT.jar

# Spark Application UI on the driver
$ open http://localhost:4040/

ref:
https://spark.apache.org/docs/latest/spark-standalone.html
https://spark.apache.org/docs/latest/submitting-applications.html
https://spark.apache.org/docs/latest/configuration.html

Recommender System: Matrix Factorization

Recommender System: Matrix Factorization

Singular Value Decomposition (SVD) 奇異值分解

        item_a  item_b  item_c
user_1  2       -       -
user_2  -       1       -
user_3  3       3       -
user_4  -       2       2

對 m x n 的評分矩陣做矩陣分解,矩陣分解常用的方法之一是 SVD 奇異值分解,把評分矩陣分解成三個矩陣 U S V*。

SVD 在推荐系统中的应用
http://yanyiwu.com/work/2012/09/10/SVD-application-in-recsys.html

奇异值分解 (SVD) 原理与在降维中的应用
http://www.cnblogs.com/pinard/p/6251584.html

Alternating Least Squares (ALS) 交替最小二乘法

因為 SVD 要求矩陣是稠密的,推薦系統中的評分矩陣通常是一個很大的稀疏矩陣,所以實務上都會使用 SVD 的變種:Funk-SVD,Funk-SVD 也稱為 Latent Factor Model (LFM) 隱語意模型:把 m x n 的評分矩陣 R 分解成 U I 兩個矩陣,分別是 m x k 的 users 矩陣和 k x n 的 items 矩陣,各自都有 k 維的特徵,計算時只考慮評分不為 0 的項目。

ALS 就是求出 U I 矩陣的一種求解方法。其他的方式還有 Stochastic Gradient Descent (SGD)。

ALS 在 Spark MLlib 中的实现
http://www.csdn.net/article/2015-05-07/2824641

矩阵分解在协同过滤推荐算法中的应用
http://www.cnblogs.com/pinard/p/6351319.html

基于矩阵分解的隐因子模型
http://www.voidcn.com/blog/winone361/article/p-5031282.html

ALS for implicit feedbacks

這種模型也稱為 Weighted Regularized Matrix Factorization (WRMF)。

        item_a  item_b  item_c
user_1  1       -       -
user_2  -       1       -
user_3  1       1       -
user_4  -       1       1

符號說明:

  • Rui = User u's interaction on Item i
  • Xu = User u's vector
  • Yi = Item i's vector
  • Pui = 1 if Rui > 0 else 0
  • Cui = 1 + alpha x Rui

參數說明:

  • iteration 迭代次數
  • rank 隱含特徵數
  • lambda 正則化參數
  • alpha 置信度權重

跟 Explicit ALS 一樣,把 m x n 的隱式反饋矩陣 R 分解成 X Y 兩個矩陣,分別是 m x k 的 user latent factors 矩陣和 k x n 的 item latent factors 矩陣。不同的是,還額外引入了兩個矩陣:m x n 的 P 矩陣,binary preference,Pui 表示用戶是不是對該物品感興趣;m x n 的 C 矩陣,confidence 置信度(或者想成是 strength 強度),Cui 表示有多確定用戶對該物品感興趣。整個演算法的目標仍舊是計算出 users 和 items 矩陣,跟 Explicit ALS 的差別在於 loss function 多加入了 Cui 和 Pui,而且會考慮所有的 user-item pair,包含 missing value / non-observed value(Rui = 0)。

只要用戶對物品有過行為(Rui > 0),我們就假設用戶對該物品有偏好(Pui = 1),用戶對該物品的行為越多(Rui 越大),Cui 就會越大,表示置信度越高,假設越可信;如果用戶對該物品沒有行為(Rui = 0),就假設用戶對該物品沒有偏好(Pui = 0),但是因為 Cui = 1 + alpha x 0,所以這個假設相對來說置信度較低,比較不可信。

Explicit ALS 是要預測用戶對某個物品的評分;Implicit ALS 則是要預測用戶會不會對某個物品感興趣:Pui = Xu x Yi.T,也就是說 Implicit ALS 輸出的 prediction 其實是用 X x Y.T 所重建出來的 P 矩陣。這些 prediction 的值基本上會落在 [0, 1] 之間,但是也可能大於 1 或是小於 0,值越大表示用戶對該物品有越強烈的偏好,越小則反之。

因為 ALS 屬於 collaborative filtering,所以也有 cold start 的問題,無法對新用戶和新物品做出推薦。除此之外,ALS 的缺點是缺乏解釋性而且難以加入 side information(user 和 item 本身的 features,例如性別、曲風、類型之類的)。不過因為 matrix factorization 其實隱含了 clustering 的功能,相對於其他方式來說,具有抗雜訊的能力。

Collaborative Filtering for Implicit Feedback Datasets
http://yifanhu.net/PUB/cf.pdf

A Gentle Introduction to Recommender Systems with Implicit Feedback
https://jessesw.com/Rec-System/

Intro to Implicit Matrix Factorization: Classic ALS
http://blog.ethanrosenthal.com/2016/10/19/implicit-mf-part-1/

Spark ML 算法原理剖析
https://github.com/endymecy/spark-ml-source-analysis

Calculate the similarity of two vectors

Calculate the similarity of two vectors

scipy.spatial.distance
https://docs.scipy.org/doc/scipy/reference/spatial.distance.html

sklearn.metrics
http://scikit-learn.org/stable/modules/classes.html#module-sklearn.metrics

Distance

Euclidean distance 歐幾里德距離

from sklearn.metrics.pairwise import euclidean_distances

euclidean_distances([0, 0, 0, 0], [0, 0, 0, 0])
# array([[ 0.]])

euclidean_distances([1, 0, 1, 0], [1, 0, 1, 0])
# array([[ 0.]])

euclidean_distances([0, 1, 0, 1], [1, 0, 1, 0])
# array([[ 2.]])

ref:
http://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.euclidean_distances.html

Manhattan Distance 曼哈頓距離

from sklearn.metrics.pairwise import manhattan_distances

manhattan_distances([0, 0, 0, 0], [0, 0 , 0, 0])
# array([[ 0.]])

manhattan_distances([1, 1, 1, 0], [1, 0, 0, 0])
# array([[ 2.]])

manhattan_distances([0, 1, 0, 1], [1, 0, 1, 0])
# array([[ 4.]])

ref:
http://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.manhattan_distances.html

Similarity

Cosine similarity 餘弦相似度

from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics.pairwise import cosine_distances
from sklearn.metrics.pairwise import pairwise_distances
from scipy.spatial.distance import pdist, squareform

cosine_similarity(matrix) == \
1 - cosine_distances(matrix) == \
1 - pairwise_distances(matrix, metric='cosine') == \
1 - squareform(pdist(matrix, 'cosine'))

cosine_similarity([0, 0, 0, 0], [0, 0, 0, 0])
# array([[ 0.]])

cosine_similarity([1, 0, 0, 0], [1, 0, 0, 0])
# array([[ 1.]])

cosine_similarity([1, 0, 1, 0], [0, 1, 0, 1])
# array([[ 0.]])

cosine_similarity([1, 0, 0, 1], [1, 0, 0, 0])
# array([[ 0.70710678]])

cosine_similarity([1, 0, 0, 1], [1, 0, 1, 0])
# array([[ 0.5]])

ref:
http://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.cosine_similarity.html
http://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.cosine_distances.html
http://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.pairwise_distances.html
https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.pdist.html

Jaccard similarity coefficient score

from sklearn.metrics import jaccard_similarity_score

jaccard_similarity_score([0, 0, 0, 0], [0, 0, 0, 0])
# 1.0

jaccard_similarity_score([0, 0, 0, 0], [1, 0, 0, 0])
# 0.75

jaccard_similarity_score([1, 0, 0, 0], [1, 0, 0, 0])
# 1.0

jaccard_similarity_score([1, 0, 1, 0], [0, 1, 0, 1])
# 0.0

ref:
http://scikit-learn.org/stable/modules/generated/sklearn.metrics.jaccard_similarity_score.html

http://datascience.stackexchange.com/questions/5121/applications-and-differences-for-jaccard-similarity-and-cosine-similarity

Log-Likelihood similarity

TODO

Pearson correlation coefficient 皮爾森相關係數

It has a value between +1 and −1 inclusive, where 1 is total positive linear correlation, 0 is no linear correlation, and −1 is total negative linear correlation. You should only calculate Pearson Correlations when the number of items in common between two users is > 1, preferably greater than 5/10. Only calculate the Pearson Correlation for two users where they have commonly rated items.

For hign-dimensional binary attributes, the performances of Pearson correlation coefficient and Cosine similarity
are better than Jaccard similarity coefficient score.

from scipy.stats import pearsonr

pearsonr([1, 0, 1, 1], [0, 0, 0, 0])
# (nan, 1.0)

pearsonr([1, 0, 1, 1], [1, 0, 0, 0])
# (0.33333333333333331, 0.66666666666666607)

pearsonr([1, 0, 1, 0], [0, 1, 0, 1])
# (-1.0, 0.0)

ref:
https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.pearsonr.html
http://stackoverflow.com/questions/11429604/how-is-nan-handled-in-pearson-correlation-user-user-similarity-matrix-in-a-recom

Dissimilarity

Dice dissimilarity

from scipy.spatial.distance import dice
import numpy as np

v1 = np.array([0, 0, 0, 0])
v2 = np.array([0, 0, 0, 0])

try:
    sim = 1.0 - dice(v1.astype(bool), v2.astype(bool))
except ZeroDivisionError:
    sim = 0

ref:
https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.dice.html
https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.kulsinski.html
https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.sokalsneath.html
https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.yule.html

Recommender System: Collaborative Filtering 協同過濾推薦演算法

Recommender System: Collaborative Filtering 協同過濾推薦演算法

dataset 會是 m 個用戶對 n 個物品的評分 utility matrix
因為通常只有部分用戶和部份物品會有評分資料
所以是一個 sparse matrix(稀疏矩陣)
目標是利用這些稀疏的資料去預測出用戶對他還沒評分過的物品的評分
除了評分之外,也可能是喜歡(和不喜歡)、購買、瀏覽之類的數據
又分成主動評分和被動評分

CF 的缺點:

  • 如果沒有用戶的歷史數據就沒辦法做任何推薦
  • 以及無論 user-based 或 item-based 都需要消耗大量的運算資源
  • 大部分用戶有評分紀錄的資料都只佔所有資料中的很小一部分,matrix 相當稀疏,很難找到相似的資料
  • 會有馬太效應,越熱門的物品越容易被推薦,所以通常都會降低熱門物品的權重

CF 主要分為 memory-based 和 model-based 兩大類
user-based 和 item-based collaborative filtering 屬於 memory-based
memory-based 基本上就是純粹的計算,沒有什麼 Machine Learning 的成分
model-based 才是 Machine Learning 的範疇

User-based Collaborative Filtering

        item_a  item_b  item_c
user_1  2       -       3
user_2  5       2       -
user_3  3       3       1
user_4  -       2       2
# the algorithm from "Mahout in Action"
for every other user w
  compute a similarity s between u and w
  retain the top users, ranked by similarity, as a neighborhood n

for every item i that some user in n has a preference for,
      but that u  has no preference for yet
  for every other user v in n that has a preference for i
    compute a similarity s between u and  v
    incorporate v's preference for i, weighted by s, into a running average

user-based 考慮的是 user 和 user 之間的相似程度

給定一個用戶 A
計算用戶 A 跟其他所有用戶的相似度
找出最相似的 m 個用戶
再找出這些用戶有評分但是用戶 A 沒有評分的物品(也可以額外限制至少要幾個用戶有評分過)
以「相似用戶的相似度」和「該用戶對該物品的評分」來加權算出用戶 A 對這些未評分物品的評分
最後推薦給 A 評分最高的 n 個物品

預測 user_4 對 item_a 的評分 =
(user_4_user_1_sim x user_1_item_a_rating + user_4_user_3_sim x user_3_item_a_rating) / (user_4_user_1_sim + user_4_user_3_sim)

user-based 的特點:

  • 適合 user 遠少於 item 的系統,相似度的計算量會較少
  • item 的時效性強、更多樣的系統,例如新聞、社交網站,適合用 user-based CF
  • 不容易給出推薦理由
  • 驚喜度較高

常用的相似度演算法:

  • Pearson Correlation Coefficient
  • Cosine Similarity
  • Adjusted Cosine Similarity(有些用戶傾向於對所有物品評高分或低分,這個計算方式可以消除這樣的影響)

ref:
https://www.safaribooksonline.com/library/view/mahout-in-action/9781935182689/kindle_split_013.html

Item-based Collaborative Filtering

        user_1  user_2  user_3  user_4
item_a  2       5       3       -
item_b  -       2       3       2
item_c  3       -       1       2
# the algorithm from "Mahout in Action"
for every item i that u has no preference for yet
  for every item j that u has a preference for
    compute a similarity s between i and j
    add u's preference for j, weighted by s, to a running average
return the top items, ranked by weighted average

item-based 考慮的是 item 和 item 之間的相似程度
item-based 用的還是跟 user-based CF 一模一樣的資料
而不是使用 item 本身的特徵(那個叫 content-based)

如果物品數比用戶數還少得多的話
可以事先計算好所有物品之間的相似度
給定一個用戶 A
找出用戶 A 的所有未評分物品
以「用戶 A 的已評分物品對該未評分物品的相似度」和「用戶 A 對已評分物品的評分」來加權算出用戶 A 對這些未評分物品的評分
最後推薦給用戶 A 評分最高的 n 個物品

預測 user_4 對 item_a 的評分 =
(item_b_item_a_sim x user_4_item_b_rating + item_c_item_a_sim x user_4_item_c_rating) / (item_b_item_a_sim + item_c_item_a_sim)

也可以無視用戶 A 的歷史評分資料(或是根本沒有用戶 A 的歷史資料)
直接推薦跟某個物品最相似的 n 個物品

item-based 的特點:

  • 適合 item 遠少於 user 的系統,相似度的計算量會較少
  • 購物、電影、音樂、書籍等系統,用戶的興趣相對固定,適合用 item-based CF
  • 只會推薦類似的東西,驚喜度和多樣性較低
  • 通常只有在用戶量比較小的時候才需要頻繁地重新計算物品之間的相似度,隨著用戶量越大,物品的相似度會趨於穩定

ref:
https://ashokharnal.wordpress.com/2014/12/18/worked-out-example-item-based-collaborative-filtering-for-recommenmder-engine/
http://blog.csdn.net/huagong_adu/article/details/7362908

Slope One Recommender

        item_a  item_b  item_c
user_1  5       3       2
user_2  3       4       -
user_3  -       2       5
# the algorithm from "Mahout in Action"
for every item i the user u expresses no preference for
  for every item j that user u expresses a preference for
    find the average preference difference between j and i
    add this diff to u's preference value for j
    add this to a running average
return the top items, ranked by these averages

因為 memory-based collaborative filtering 的其中一個問題是數據量很大時計算量也會很可觀
所有就有人提出 Slope One 這種簡單粗暴的演算法來
雖然 Slope One 還是得計算所有物品兩兩之間的平均差異

Slope One 假設任兩個物品之間的評分都是一個 y = mx + b 而且 m = 1(斜率為 1)的線性關係
item_a 平均比 item_b 多 (2 + (-1)) / 2 = 0.5
item_a 平均比 item_c 多 (5 - 2) / 1 = 3
如果用 user_3 對 item_b 的評分來預測他對 item_a 的評分會是 2 + 0.5 = 2.5
如果用 user_3 對 item_c 的評分來預測他對 item_a 的評分會是 5 + 3 = 8
通常會用有多少人同時評分來加權多個評分

預測 user_3 對 item_a 的評分 =
((同時對 item_a 和 item_b 評分的人數 x user_3 用 item_b 對 item_a 的預測評分) + (同時對 item_a 和 item_c 評分的人數 x user_3 用 item_c 對 item_a 的預測評分)) / (同時對 item_a 和 item_b 評分的人數 + 同時對 item_a 和 item_c 評分的人數)
((2 x 2.5) + (1 x 8)) / (2 + 1) = 4.33

ref:
https://en.wikipedia.org/wiki/Slope_One