Spark RDD methods (Python / Scala)

Spark RDD methods (Python / Scala)

以下的 methods 有些是所有 RDD 都能用,有些則是 PairRDD 才有的功能。然後因為在不同的 projects 我先後用了 Spark 的 Python API 和 Scala API,所以以下的內容可能會混雜著兩者的範例。Scala API 要特別注意每個 method 接受和回傳的 type 的差異;Python API 就沒有這種限制了,畢竟是動態語言。

ref:
http://spark.apache.org/docs/latest/programming-guide.html#transformations
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions

RDD Methods

map()

函數 signature 為 def map[U](f: (T) ⇒ U): RDD[U]

map() 接受的 function 的輸入參數就是 RDD 的每個元素(從 DataFrame 的角度看,每個 row):func(row),return 一個任意物件(例如一個 int、一個 string 或一個 tuple)。所以 map() 之後會得到一個 row 數相同的 RDD,但是 type 可能會不一樣。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd.map(lambda row: (row[0], row[1])).collect()
# [(1, 1), (1, 2), (1, 3), (1, 6), (2, 6), (3, 1), (3, 5), (4, 1), (4, 4)]
import org.apache.spark.rdd.RDD

val rdd: RDD[(String, Int)] = sc.parallelize(Seq(
  ("Python", 1),
  ("Scala", 3),
  ("Java", 2)
))
val test: RDD[String] = rdd.map({
  case (lang, count) => {
    s"${lang}_${count}"
  }
})
test.collect().foreach(println)
// Python_1
// Scala_3
// Java_2

flatMap()

函數 signature 為 def flatMap[U](f: (T) ⇒ TraversableOnce[U]): RDD[U]

flatMap()map() 很像,接受的 function 的輸入參數也是 RDD 的每個 row:func(row),差別在於 flatMap() 只能回傳一個 Iterable 物件(例如一個 tuple 或 list,但是可以是空的),而且 flatMap() 會把 return 的結果攤平。所以 flatMap() 之後的 count() 可能會比原本的 RDD 大或小。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd.flatMap(lambda row: (row[0], row[1])).collect()
# [1, 1, 1, 2, 1, 3, 1, 6, 2, 6, 3, 1, 3, 5, 4, 1, 4, 4]

df.rdd.flatMap(lambda row: (row[2], )).collect()
# [1, 1, 1, 0, 1, 1, 1, 0, 1]

ref:
http://backtobazics.com/big-data/spark/apache-spark-flatmap-example/
http://apachesparkbook.blogspot.tw/2015/05/difference-between-map-flatmap-in-rdd.html
http://stackoverflow.com/questions/42867457/map-each-element-of-a-list-in-spark
http://stackoverflow.com/questions/21096432/list-or-iterator-of-tuples-returned-by-map-pyspark

reduce()

函數 signature 為 def reduce(f: (T, T) ⇒ T): T

reduce() 接受的 function 的輸入參數是 RDD 的兩兩元素:func(element1, element2),return 單一物件,而且是跟輸入參數同樣 type 的,最後整個 reduce() 會得到一個單一的值。

array = [
    1,
    5,
    4,
    2,
    3,
]
rdd = sc.parallelize(array)

rdd.reduce(lambda element1, element2: element1 + element2)
# 15

def max(element1, element2):
    return element1 if element1 > element2 else element2

rdd.reduce(max)
# 5

treeReduce()

函數 signature 為 def treeReduce(f: (T, T) ⇒ T, depth: Int = 2): T

普通的 reduce() 會直接把每一個 partition 的 reduce 結果送回 driver machine 做最後運算,當 partition 很多和每個 partition 的資料量很大的時候可能會是一個瓶頸。這時候你可以改用 treeReduce(),不過使用不當的話反而會有反效果。類似的關係還有 aggregate()treeAggregate()

ref:
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/treereduce_and_treeaggregate_demystified.html

aggregate()

函數 signature 為 def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U

import org.apache.spark.rdd.RDD

case class WikipediaArticle(title: String, text: String) {
  def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}

val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
  WikipediaArticle("a1", "abc Scala xyz"),
  WikipediaArticle("a2", "abc Python xyz"),
  WikipediaArticle("a3", "abc Scala xyz"),
  WikipediaArticle("a4", "abc Scala xyz")
))

def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int = {
  val accumulateOp = (total: Int, a: WikipediaArticle) => total + 1
  val combineOp = (count1: Int, count2: Int) => count1 + count2
  rdd.filter(_.mentionsLanguage(lang)).aggregate(0)(accumulateOp, combineOp)
}

occurrencesOfLang("Scala", wikiRdd)
// 3

ref:
https://stackoverflow.com/questions/28240706/explain-the-aggregate-functionality-in-spark

PairRDD Methods

PairRDD 就是包含 key/value pair 的 RDD,長這樣:RDD[(K, V)]KV 除了可能是基本的 type 之外,也可能是其他 object 或 collection。PairRDDFunctions 回傳的也都是 RDD[(K, V)]

ref:
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

mapValues()

函數 signature 為 def mapValues[U](f: (V) ⇒ U): RDD[(K, U)]

mapValues() 的 function 接受的唯一一個參數就是 PairRDDV,它只處理 values。

val rdd = sc.parallelize(Seq(
  ("Python", 1),
  ("Scala", 3),
  ("Java", 2)
))

rdd
  .mapValues((count: Int) => {
    count * 10
  })
  .collect()
// Array((Python,10), (Scala,30), (Java,20))

ref:
http://apachesparkbook.blogspot.tw/2015/12/mapvalues-example.html

groupByKey()

函數 signature 為 def groupByKey(): RDD[(K, Iterable[V])]

以下這些情況應該避免使用 groupByKey()

  • If operataion is expressed using groupByKey followed by associative and commutative reducing operation on values (sum, count, max / min) it should be replaced by reduceByKey.
  • If operation can be expressed using a comination of local sequence operation and merge operation (online variance / mean, top-n observations) it should be expressed with combineByKey or aggregateByKey.
  • If final goal is to traverse values in a specific order (groupByKey followed by sorting values followed by iteration) it can be typically rewritten as repartitionAndSortWithinPartitions with custom partitioner and ordering followed by mapPartitions.
case class WikipediaArticle(title: String, text: String) {
  def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}

val langs = List(
  "Java", "PHP", "Python", "Scala"
)

val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
  WikipediaArticle("a1", "a1 Scala"),
  WikipediaArticle("a2", "a2 Python"),
  WikipediaArticle("a3", "a3 Scala"),
  WikipediaArticle("a4", "a4 Java"),
  WikipediaArticle("a5", "a5 Java"),
  WikipediaArticle("a6", "a6 Scala")
))

def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = {
  rdd
    .flatMap((article: WikipediaArticle) => {
      langs
        .filter((lang: String) => {
          article.mentionsLanguage(lang)
        })
        .map((lang: String) => {
          (lang, article)
        })
    })
    .groupByKey()
}

def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd)
// Array(
//   (Java,CompactBuffer(WikipediaArticle(a4,a4 Java), WikipediaArticle(a5,a5 Java))),
//   (Python,CompactBuffer(WikipediaArticle(a2,a2 Python))),
//   (Scala,CompactBuffer(WikipediaArticle(a1,a1 Scala), WikipediaArticle(a3,a3 Scala), WikipediaArticle(a6,a6 Scala)))
// )

def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] = {
  index
    .map({
      case (lang, articles) => {
        (lang, articles.size)
      }
    })
    .sortBy(-_._2)
    .collect()
    .toList
}

rankLangsUsingIndex(index)
// List((Scala,3),(Java,2),(Python,1),(PHP,0))

Spark Best Practices
https://github.com/beeva/beeva-best-practices/blob/master/big_data/spark/README.md

RDD actions and Transformations by Example
https://github.com/awesome-spark/spark-gotchas/blob/master/04_rdd_actions_and_transformations_by_example.md

Avoid groupByKey when performing an associative reductive operation
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/avoid_groupbykey_when_performing_an_associative_re.html

reduceByKey()

函數 signature 為 def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

reduceByKey() 接受的兩個參數的類型是由 PairRDDV 決定的。做到同樣的功能,reduceByKey() 的執行效率比 groupByKey() + reduce() 好很多。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd \
    .map(lambda row: (row[0], row[1])) \
    .reduceByKey(lambda x, y: x + y) \
    .collect()
# map() => [(1, 1), (1, 2), (1, 3), (1, 6), (2, 6), (3, 1), (3, 5), (4, 1), (4, 4)]
# reduceByKey() => [(1, 12), (2, 6), (3, 6), (4, 5)]
matrix = [
    (1, [[100, 1.0], [200, 2.0]]),
    (2, [[300, 3.0]]),
    (2, [[400, 4.0], [600, 6.0]]),
    (2, [[500, 5.0]]),
]
df = spark.createDataFrame(matrix, ['user', 'recommendations'])

def merge_recommendations(recommendations1, recommendations2):
    return recommendations1 + recommendations2

def slice_recommendations(row, candidate_k):
    user, recommendations = row
    sliced_recommendations = sorted(recommendations, key=lambda recommendation: recommendation.rating, reverse=True)[:candidate_k + 5]
    return (user, sliced_recommendations)

full_rdd = df \
    .rdd \
    .reduceByKey(lambda x, y: merge_recommendations(x, y)) \
    .map(lambda row: slice_recommendations(row, candidate_k))
final_df = spark.createDataFrame(full_rdd, ['user', 'recommendations'])
val langs = List(
  "Java", "PHP", "Python", "Scala"
)

val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
  WikipediaArticle("a1", "a1 Scala"),
  WikipediaArticle("a2", "a2 Python"),
  WikipediaArticle("a3", "a3 Scala"),
  WikipediaArticle("a4", "a4 Java"),
  WikipediaArticle("a5", "a5 Java"),
  WikipediaArticle("a6", "a6 Scala")
))

def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = {
  rdd
    .flatMap((article: WikipediaArticle) => {
      langs
        .filter((lang: String) => {
          article.mentionsLanguage(lang)
        })
        .map((lang: String) => {
          (lang, 1)
        })
    })
    .reduceByKey((count1: Int, count2: Int) => {
      count1 + count2
    })
    .collect()
    .toList
    .sortWith(_._2 > _._2)
}

rankLangsReduceByKey(langs, wikiRdd)
// List((Scala,3),(Java,2),(Python,1),(PHP,0))

Avoid reduceByKey when the input and output value types are different
http://backtobazics.com/big-data/spark/apache-spark-reducebykey-example/

Reduce a key-value pair into a key-list pair
http://stackoverflow.com/questions/27002161/reduce-a-key-value-pair-into-a-key-list-pair-with-apache-spark

foldByKey()

函數 signature 為 def foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]

foldByKey() 基本上就是可以手動指定 zero value 的 reduceByKey()

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd \
    .map(lambda row: (row[0], [row[1], ])) \
    .foldByKey(list(), add) \
    .collect()
# [(4, [1, 4]), (1, [1, 2, 3, 6]), (2, [6]), (3, [1, 5])]

aggregateByKey()

函數 signature 為 def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U): RDD[(K, U)]

適合用來取代 .map(lambda row: (row['user'], [row['item'], ])).reduceByKey(lambda v1, v2: v1 + v2) 這樣的套路。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 4, 1),
    (1, 5, 1),
    (2, 2, 1),
    (2, 2, 1),
    (2, 3, 1),
    (3, 5, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

def seqFunc(item_set, item):
    item_set.add(item)
    return item_set

def combFunc(item_set1, item_set2):
    return item_set1.union(item_set2)

df.select('user', 'item').rdd \
    .aggregateByKey(set(), seqFunc, combFunc) \
    .collect()
# [(1, {1, 2, 4, 5}), (2, {2, 3}), (3, {5})]

ref:
http://codingjunkie.net/spark-agr-by-key/
https://stackoverflow.com/questions/31081563/apache-spark-what-is-the-equivalent-implementation-of-rdd-groupbykey-using-rd
https://stackoverflow.com/questions/24804619/how-does-spark-aggregate-function-aggregatebykey-work

combineByKey()

函數 signature 為 def combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

用來把 RDD[(K, V)] 轉變成 RDD[(K, C)]C 可以是任意的 type。

combineByKey() 接受三個 functions:

  • createCombiner, which turns a V into a C (e.g., creates a one-element list)
  • mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
  • mergeCombiners, to combine two C’s into a single one.
import pyspark.sql.functions as F

matrix = [
    ('chinese', 80),
    ('math', 60),
    ('english', 100),
    ('chinese', 90),
    ('math', 100),
    ('math', 10),
    ('english', 70),
    ('english', 20),
    ('history', 30),
]
df = spark.createDataFrame(matrix, ['subject', 'score'])

def createCombiner(score):
    return (score, 1)

def mergeValue(accumulate, score):
    total_score = accumulate[0] + score
    total_count = accumulate[1] + 1
    return (total_score, total_count)

def mergeCombiners(accumulate1, accumulate2):
    total_score = accumulate1[0] + accumulate2[0]
    total_count = accumulate1[1] + accumulate2[1]
    return (total_score, total_count)

df.rdd.combineByKey(createCombiner, mergeValue, mergeCombiners).collect()
# you could calculate the average score of every subject
# [('chinese', (170, 2)),
 # ('history', (30, 1)),
 # ('math', (170, 3)),
 # ('english', (190, 3))]

ref:
https://zhangyi.gitbooks.io/spark-in-action/content/chapter2/rdd.html

cogroup()

如果你要 join 兩個已經被 groupBy 的 RDD,可以使用 cogroup()。避免使用 flatMap + join + groupBy 這樣的套路。

Setup Spark on macOS

Setup Spark on macOS

Install

First, you need Java 8 JDK.
http://www.oracle.com/technetwork/java/javase/downloads/index.html

$ java -version
java version "1.8.0_131"

Homebrew Version

$ brew update
$ brew install maven apache-spark

Pre-built Version

$ mkdir -p /usr/local/share/apache-spark && \
  cd /usr/local/share/apache-spark && \
  wget https://www.apache.org/dyn/closer.lua/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz && \
  tar -xvzf spark-2.2.0-bin-hadoop2.7.tgz

ref:
http://spark.apache.org/downloads.html

Build Version

This is the recommended way.

$ brew install [email protected]
$ export PATH="/usr/local/opt/[email protected]/bin:$PATH"
$ scala -version
Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL

$ mkdir -p /usr/local/share/apache-spark && \
  cd /usr/local/share/apache-spark && \
  wget https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0.tgz && \
  tar -xvzf spark-2.2.0.tgz && \
  cd spark-2.2.0

$ ./build/mvn -Pnetlib-lgpl -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.3 -DskipTests -T 4C package
# or
$ ./build/mvn -Pnetlib-lgpl -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.3 -DskipTests clean package
$ spark-shell --packages "com.github.fommil.netlib:all:1.1.2"
scala> import com.github.fommil.netlib.BLAS
import com.github.fommil.netlib.BLAS
scala> BLAS.getInstance().getClass().getName()
res1: String = com.github.fommil.netlib.NativeSystemBLAS

ref:
http://spark.apache.org/downloads.html
http://spark.apache.org/docs/latest/building-spark.html
http://spark.apache.org/docs/latest/ml-guide.html#dependencies

Configurations

in .zshrc

if which java > /dev/null; then
  export JAVA_HOME="$(/usr/libexec/java_home -v 1.8)"
  export PATH="$JAVA_HOME/bin:$PATH"
fi

export PATH="/usr/local/opt/[email protected]/bin:$PATH"

# homebrew version
export SPARK_HOME="/usr/local/Cellar/apache-spark/2.2.0/libexec"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"
export PYSPARK_DRIVER_PYTHON="ipython"

# pre-built version
export SPARK_HOME="/usr/local/share/apache-spark/spark-2.2.0-bin-hadoop2.7"
export PATH="$SPARK_HOME/bin:$PATH"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"

# build version
export SPARK_HOME="/usr/local/share/apache-spark/spark-2.2.0"
export PATH="$SPARK_HOME/bin:$PATH"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"

ref:
https://spark.apache.org/docs/latest/programming-guide.html
https://spark.apache.org/docs/latest/configuration.html

$ cd $SPARK_HOME

$ cp conf/spark-defaults.conf.template conf/spark-defaults.conf
spark.driver.memory              4g
spark.executor.memory            4g
spark.jars.packages              com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41
spark.serializer                 org.apache.spark.serializer.KryoSerializer

$ cp conf/spark-env.sh.template conf/spark-env.sh
export PYTHONHASHSEED=42

$ cp conf/log4j.properties.template conf/log4j.properties

ref:
https://spark.apache.org/docs/latest/configuration.html

Commands

Local Mode

$ spark-shell

$ export PYSPARK_DRIVER_PYTHON="jupyter" && \
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip 0.0.0.0" && \
pyspark \
--packages "com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41" \
--driver-memory 4g \
--executor-memory 4g \
--master "local[*]"

$ spark-shell \
--packages "com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41"
--master "local-cluster[3, 1, 4096]"

# Spark Application UI on the driver
$ open http://localhost:4040/

ref:
https://spark.apache.org/docs/latest/programming-guide.html

Standalone mode

There are two deploy modes for Spark Standalone. In client mode, the driver is launched in the same process as the client that submits the application. In cluster mode, however, the driver is launched from one of the Worker.

$ ./sbin/start-master.sh -h localhost
$ ./sbin/start-slave.sh spark://localhost:7077

# Spark Web UI on the cluster manager
$ open http://localhost:8080/

$ pyspark \
--driver-memory 4g \
--executor-memory 4g \
--master spark://localhost:7077

$ spark-submit \
--master spark://localhost:7077 \
examples/src/main/python/pi.py 10

$ spark-submit \
--driver-memory 2g \
--driver-java-options "-XX:ThreadStackSize=81920" \
--total-executor-cores 3 \
--executor-cores 3 \
--executor-memory 12g \
--conf "spark.executor.extraJavaOptions=-XX:ThreadStackSize=81920" \
--master spark://localhost:7077 \
--packages "mysql:mysql-connector-java:5.1.41,com.hankcs:hanlp:portable-1.3.4,edu.stanford.nlp:stanford-corenlp:3.7.0" \
--jars "/Users/vinta/Projects/albedo/spark-data/stanford-corenlp-3.8.0-models.jar" \
--class ws.vinta.albedo.LogisticRegressionRanker \
target/albedo-1.0.0-SNAPSHOT.jar

# Spark Application UI on the driver
$ open http://localhost:4040/

ref:
https://spark.apache.org/docs/latest/spark-standalone.html
https://spark.apache.org/docs/latest/submitting-applications.html
https://spark.apache.org/docs/latest/configuration.html

Recommender System: Matrix Factorization

Recommender System: Matrix Factorization

Singular Value Decomposition (SVD) 奇異值分解

        item_a  item_b  item_c
user_1  2       -       -
user_2  -       1       -
user_3  3       3       -
user_4  -       2       2

對 m x n 的評分矩陣做矩陣分解,矩陣分解常用的方法之一是 SVD 奇異值分解,把評分矩陣分解成三個矩陣 U S V*。

SVD 在推荐系统中的应用
http://yanyiwu.com/work/2012/09/10/SVD-application-in-recsys.html

奇异值分解 (SVD) 原理与在降维中的应用
http://www.cnblogs.com/pinard/p/6251584.html

Alternating Least Squares (ALS) 交替最小二乘法

因為 SVD 要求矩陣是稠密的,推薦系統中的評分矩陣通常是一個很大的稀疏矩陣,所以實務上都會使用 SVD 的變種:Funk-SVD,Funk-SVD 也稱為 Latent Factor Model (LFM) 隱語意模型:把 m x n 的評分矩陣 R 分解成 U I 兩個矩陣,分別是 m x k 的 users 矩陣和 k x n 的 items 矩陣,各自都有 k 維的特徵,計算時只考慮評分不為 0 的項目。

ALS 就是求出 U I 矩陣的一種求解方法。其他的方式還有 Stochastic Gradient Descent (SGD)。

ALS 在 Spark MLlib 中的实现
http://www.csdn.net/article/2015-05-07/2824641

矩阵分解在协同过滤推荐算法中的应用
http://www.cnblogs.com/pinard/p/6351319.html

基于矩阵分解的隐因子模型
http://www.voidcn.com/blog/winone361/article/p-5031282.html

ALS for implicit feedbacks

這種模型也稱為 Weighted Regularized Matrix Factorization (WRMF)。

        item_a  item_b  item_c
user_1  1       -       -
user_2  -       1       -
user_3  1       1       -
user_4  -       1       1

符號說明:

  • Rui = User u's interaction on Item i
  • Xu = User u's vector
  • Yi = Item i's vector
  • Pui = 1 if Rui > 0 else 0
  • Cui = 1 + alpha x Rui

參數說明:

  • iteration 迭代次數
  • rank 隱含特徵數
  • lambda 正則化參數
  • alpha 置信度權重

跟 Explicit ALS 一樣,把 m x n 的隱式反饋矩陣 R 分解成 X Y 兩個矩陣,分別是 m x k 的 user latent factors 矩陣和 k x n 的 item latent factors 矩陣。不同的是,還額外引入了兩個矩陣:m x n 的 P 矩陣,binary preference,Pui 表示用戶是不是對該物品感興趣;m x n 的 C 矩陣,confidence 置信度(或者想成是 strength 強度),Cui 表示有多確定用戶對該物品感興趣。整個演算法的目標仍舊是計算出 users 和 items 矩陣,跟 Explicit ALS 的差別在於 loss function 多加入了 Cui 和 Pui,而且會考慮所有的 user-item pair,包含 missing value / non-observed value(Rui = 0)。

只要用戶對物品有過行為(Rui > 0),我們就假設用戶對該物品有偏好(Pui = 1),用戶對該物品的行為越多(Rui 越大),Cui 就會越大,表示置信度越高,假設越可信;如果用戶對該物品沒有行為(Rui = 0),就假設用戶對該物品沒有偏好(Pui = 0),但是因為 Cui = 1 + alpha x 0,所以這個假設相對來說置信度較低,比較不可信。

Explicit ALS 是要預測用戶對某個物品的評分;Implicit ALS 則是要預測用戶會不會對某個物品感興趣:Pui = Xu x Yi.T,也就是說 Implicit ALS 輸出的 prediction 其實是用 X x Y.T 所重建出來的 P 矩陣。這些 prediction 的值基本上會落在 [0, 1] 之間,但是也可能大於 1 或是小於 0,值越大表示用戶對該物品有越強烈的偏好,越小則反之。

因為 ALS 屬於 collaborative filtering,所以也有 cold start 的問題,無法對新用戶和新物品做出推薦。除此之外,ALS 的缺點是缺乏解釋性而且難以加入 side information(user 和 item 本身的 features,例如性別、曲風、類型之類的)。不過因為 matrix factorization 其實隱含了 clustering 的功能,相對於其他方式來說,具有抗雜訊的能力。

Collaborative Filtering for Implicit Feedback Datasets
http://yifanhu.net/PUB/cf.pdf

A Gentle Introduction to Recommender Systems with Implicit Feedback
https://jessesw.com/Rec-System/

Intro to Implicit Matrix Factorization: Classic ALS
http://blog.ethanrosenthal.com/2016/10/19/implicit-mf-part-1/

Spark ML 算法原理剖析
https://github.com/endymecy/spark-ml-source-analysis

Linux commands cookbook

Linux commands cookbook

50 Most Frequently Used UNIX / Linux Commands
http://www.thegeekstuff.com/2010/11/50-linux-commands/

Write a Simple Script in Shell

Write a for loop.

$ for pod_name in $(kubectl get pods -l app=swag-worker-test -o jsonpath={..metadata.name}); do; kubectl delete pod $pod_name; done
pod "swag-worker-test-67fffcdd5-5hgf3" deleted
pod "swag-worker-test-67fffcdd5-h8jgg" deleted

# you could also write multiple lines
for pod_name in $(kubectl get pods -l app=swag-worker-test -o jsonpath={..metadata.name}); do
    kubectl delete pod $pod_name
done

Write a while true.

# using trap and wait will make your container react immediately to a stop request
$ bash -c "trap : TERM INT; sleep infinity & wait"
# or
$ bash -c "while true; do echo 'I am alive!'; sleep 3600; done"

or

#!/bin/bash
while true; do echo 'I am alive!'; sleep 3600; done

ref:
https://stackoverflow.com/questions/31870222/how-can-i-keep-container-running-on-kubernetes

Set Environment Variables from a File

$ export $(cat .env | grep -v ^# | xargs)

ref:
https://stackoverflow.com/questions/19331497/set-environment-variables-from-file

Switch to Another User

# the latter with "-" gets an environment as if another user just logged in
$ sudo su - ubuntu

Clear the Content of a File

$ echo -n > /var/log/nginx/error.log

ref:
https://unix.stackexchange.com/questions/88808/empty-the-contents-of-a-file

Pipeline stdout with xargs

$ find . -type f -name "*.yaml" | xargs echo
./configmap.yaml ./pvc.yaml ./service.yaml ./statefulset.yaml

$ find . -type f -name "*.yaml" | xargs -n 1 echo
./configmap.yaml
./pvc.yaml
./service.yaml
./statefulset.yaml

$ find . -type f -name "*.yaml" | xargs -n 2 echo
./configmap.yaml ./pvc.yaml
./service.yaml ./statefulset.yaml

$ redis-cli KEYS "*-*-*-*.reply.celery.pidbox" | xargs -n 100 redis-cli DEL

ref:
https://shapeshed.com/unix-xargs/

Set a Timeout for any Command

$ timeout -t 15 celery inspect ping -A app:celery -d celery@$(hostname)

Run a One-time Command at a Specific Time

at executes commands at a specified time. You may need to install the "at" package manually.

# install
$ sudo apt-get install at

# start
$ sudo atd

# list jobs
$ atq

$ at 00:05
at> echo "123" > /tmp/test.txt

$ at 00:00 18.1.2017
at> DPS_ENV=production /home/ubuntu/.virtualenvs/dps/bin/python /home/ubuntu/dps/manage.py send_emails > /tmp/send_emails.log

Press Control + D to exit at shell.

ref:
https://www.lifewire.com/linux-command-at-4091646
https://tecadmin.net/one-time-task-scheduling-using-at-commad-in-linux/

Pass Arguments to bash when Executing a script Fetched by curl

$ curl -L http://bit.ly/open-the-pod-bay-doors | bash -s -- --tags docker 

ref:
https://stackoverflow.com/a/25563308/885524
https://github.com/vinta/HAL-9000

Change a File's Modify Time

$ touch -m -d '1 Jan 2006 12:34' tmp
$ touch -m -d '1 Jan 2006 12:34' tmp/old_file.txt

ref:
https://www.unixtutorial.org/2008/11/how-to-update-atime-and-mtime-for-a-file-in-unix/

Delete Old Files under a Directory

$ find /data/storage/tmp/* -mtime +2 | xargs rm -Rf
$ find /data/storage/tmp/* -mtime +2 -exec rm {} \;

ref:
http://stackoverflow.com/questions/14731133/how-to-delete-all-files-older-than-3-days-when-argument-list-too-long

Append String to a File

# append
$ echo "the last line" >> README.md

# replace
$ echo "replace all" > README.md

Rename Sub-folders

$ for f in */migrations/; do mv -v "$f" "${f%???????????}south_migrations"; done

ref:
http://unix.stackexchange.com/questions/220176/rename-specific-level-of-sub-folders

List History Commands

$ export HISTTIMEFORMAT="%Y%m%d %T  "
$ history

Make Permission For A File Same As Another File

$ chmod --reference=file1 file2

Find Computer's Public IP

$ wget -qO- http://ipecho.net/plain ; echo

ref:
http://askubuntu.com/questions/95910/command-for-determining-my-public-ip

Compress and Uncompress Files

$ tar czf media-20151010.tar.gz media/
$ s3cmd put media-20151010.tar.gz s3://goeasytaxi/

# decompress
$ tar -xzf media.tar.gz

$ sudo apt-get install zip unzip
$ zip -j -r deps.zip spark_app/src/deps/
$ zip -r hourmasters.zip hourmasters/
$ scp -r -i ~/hourmasters.pem ssh [email protected]:/home/ubuntu/hourmasters.zip ~/Desktop/

# decompress
$ unzip stork.1.4.zip
$ gzip -d uwsgi.log.*.gz

$ gzip dps.201701171200.sql
$ gunzip dps.201701171200.sql.gz

Count File Lines

$ wc -l filename.txt

$ wc -l *.py

Find Files by Name or Content

$ find / -name virtualenvwrapper.sh

# 在現在的資料夾裡的全部檔案中搜尋字串,會自動搜尋子目錄
$ find . | xargs grep 'string'

$ find . -iname '*something*'

$ find *.html | xargs grep 'share_server.html'

# 搜尋當前目錄及子目錄下的含有 print() 字串的檔案
$ grep -rnw "." -e "print()"

$ grep -rnw "." -e "print()" --include=\*.py

ref:
https://stackoverflow.com/questions/16956810/how-do-i-find-all-files-containing-specific-text-on-linux

Find Directories by Name

$ find . -type d -name "*native*" -print

ref:
https://askubuntu.com/questions/153144/how-can-i-recursively-search-for-directory-names-with-a-particular-string-where

List Files by Date

$ ls -lrt

List Files Opened by a Process

$ lsof | grep uwsgi

$ lsof -i | grep LISTEN
$ lsof -i -n -P | grep LISTEN

Extract Content from a File

$ cat uwsgi.log | grep error

Display Contents of All Files in the Current Directory

$ grep . *
$ grep . *.html

List Used Ports

$ netstat -a

# TCP
$ netstat -ntlp | grep uwsgi

# UCP
$ netstat -nulp

Ping a Port

$ curl -I "10.148.70.84:9200"
$ curl -I "192.168.100.10:80"

$ sudo apt-get install nmap
$ nmap -p 4505 54.250.5.176
$ nmap -p 8000 10.10.100.70
$ nmap -p 5672 10.10.100.82

$ telnet 54.250.5.176 4505

ref:
http://stackoverflow.com/questions/12792856/what-ports-does-rabbitmq-use

Show Network Traffic and Bandwidth

$ tcpdump -i eth0

$ sudo apt-get install tcptrack
$ tcptrack -i eth0

ref:
http://askubuntu.com/questions/257263/how-to-display-network-traffic-in-terminal

List Running Processes

# show all processes
$ pstree -a

# also show pid
$ pstree -ap

# 列出前 10 個最佔記憶體的 processes
$ ps aux | sort -nk +4 | tail

# 列出 mysql 相關的 processes
$ ps aux | grep 'worker process'
$ ps aux | grep uwsgi

# 樹狀顯示
$ ps auxf

# 搜尋 process 並以樹狀結果顯示 parent process
$ ps f -opid,cmd -C python

Kill Processes

# 列出目前所有的正在記憶體當中的程序
$ ps aux

# 匹配字串
$ ps aux | grep "mongo"

# 幹掉它
$ kill PID

# kill all processes matching a name
$ sudo pkill -f runserver

Store User's Input as a Variable

$ read YOUR_VARIABLE_NAME

$ read name
# you type: vinta

$ echo $name
vinta

ref:
https://canred.blogspot.tw/2013/03/read.html

Show Terminal Size

$ stty size
$ echo $LINES && echo $COLUMNS
59 273 

ref:
https://stackoverflow.com/questions/263890/how-do-i-find-the-width-height-of-a-terminal-window

Functional Programming in Python

Functional Programming in Python

lambda

square_func = lambda x: x**2
square_func(2)

# equals to

def square_func(x):
    return x**2

Python 的 lambda 其實就是 JavaScript 的 arrow function

map

list 的每一個元素都會各自經過 def func(x) 去處理
最後得到的會是一個新的數量相同的 list

def double(number):
  return number * 2

print(list(map(double, [1, 2, 3, 4])))
# [2, 4, 6, 8]

# equals to

print(list(map(lambda number: number * 2, [1, 2, 3, 4])))

reduce

list 中的元素會兩兩經過 def func(x, y) 去處理
最後得到的會是一個單一的值

def add(x, y):
    return x + y

print(reduce(add, [1, 2, 3, 4]))
# ((((1+2)+3)+4)+5) = 10

# equals to

print(reduce(lambda x, y: x + y, [1, 2, 3, 4]))

filter

對 list 的每一個元素做 def func(x)
產生一個新的 list 只包含 def func(x) 結果為 True 的元素

ref:
http://www.vinta.com.br/blog/2015/functional-programming-python.html
http://www.bogotobogo.com/python/python_fncs_map_filter_reduce.php

zip

number_list = [1, 2, 3]
str_list = ['one', 'two', 'three']
list(zip(number_list, str_list))
# [(1, 'one'), (2, 'two'), (3, 'three')]

ref:
https://www.programiz.com/python-programming/methods/built-in/zip