Spark SQL cookbook (Python)

Spark SQL cookbook (Python)

Access SparkSession

from pyspark.sql import SparkSession

# get the default SparkSession instance
spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext
sc.setLogLevel('INFO')

Access custom configurations

$ spark-submit 
--master spark://localhost:7077 
--properties-file my-properties.conf 
your_spark_app.py

# in my-properties.conf
# spark.myapp.db_host 192.168.100.10
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
print(sc.getConf().get('spark.myapp.db_host'))

ref:
https://stackoverflow.com/questions/31115881/how-to-load-java-properties-file-and-use-in-spark

Create a RDD

array = [
    1,
    5,
    7,
    2,
    3,
]
rdd = sc.parallelize(array)
df = rdd.toDF('int')
# +-----+
# |value|
# +-----+
# |    1|
# |    5|
# |    7|
# |    2|
# |    3|
# +-----+

Read data from MySQL

$ pyspark --packages "mysql:mysql-connector-java:5.1.41"
url = 'jdbc:mysql://127.0.0.1:3306/albedo'
properties = {'user': 'root', 'password': '123'}
spark.read.jdbc(url, 'app_repostarring', properties=properties)

ref:
https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-configuration-properties.html

Write data to MySQL

CREATE TABLE recommender_songrecommend (
  id int(11) NOT NULL AUTO_INCREMENT,
  user_id int(11) NOT NULL,
  song_id int(11) NOT NULL,
  score double NOT NULL,
  PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=1;
matrix = [
    (1, 2, 0.1),
    (3, 4, 0.23456),
    (5, 6, 7.89),
    (7, 8, -10.111213),
]
df = spark.createDataFrame(matrix, ['user_id','song_id', 'score'])

url = 'jdbc:mysql://192.168.11.200:3306/DATABASE?user=USER&password=PASSWORD&verifyServerCertificate=false&useSSL=false&rewriteBatchedStatements=true'
properties = {'driver': 'com.mysql.jdbc.Driver'}
df 
    .selectExpr('user AS user_id', 'item AS song_id', 'score') 
    .write.jdbc(url, table='recommender_songrecommend', mode='append', properties=properties)

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.jdbc
https://stackoverflow.com/questions/2993251/jdbc-batch-insert-performance/10617768#10617768

Read data from SQLite

$ pyspark --packages "org.xerial:sqlite-jdbc:3.16.1"
from pyspark.sql.functions import lit
from pyspark.sql.types import IntegerType, StructField, StructType, TimestampType

props = {'driver': 'org.sqlite.JDBC', 'date_string_format': 'yyyy-MM-dd HH:mm:ss'}
df = spark.read.jdbc("jdbc:sqlite:db.sqlite3", "app_repostarring", properties=props)

df = df.where(df.stargazers_count >= min_stargazers_count)
df = df.select('from_user_id', 'repo_id', 'created_at')
df = df.toDF('user', 'item') 
df = df.withColumn('rating', lit(1))

schema = StructType([
    StructField('user', IntegerType(), nullable=False),
    StructField('item', IntegerType(), nullable=False),
    StructField('rating', IntegerType(), nullable=False),
    StructField('item_created_at', TimestampType(), nullable=False),
])
df = spark.createDataFrame(df.rdd, schema)

ref:
https://github.com/xerial/sqlite-jdbc

java.text.ParseException: Unparseable date: "2016-04-22 17:26:54"
https://github.com/xerial/sqlite-jdbc/issues/88

Read data from parquet

from pyspark.sql.utils import AnalysisException

raw_df_filepath = 'raw_df.parquet'

try:
    raw_df = spark.read.format('parquet').load(raw_df_filepath)
except AnalysisException as exc:
    if 'Path does not exist' in exc.desc:
        raw_df = load_raw_data()
        raw_df.write.format('parquet').save(raw_df_filepath)
    else:
        raise exc

ref:
https://community.hortonworks.com/articles/21303/write-read-parquet-file-in-spark.html

Create a DataFrame

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (2, 1, 0),
    (3, 1, 1),
    (3, 3, 1),
    (3, 4, 1),
    (4, 1, 0),
    (4, 2, 0),
    (5, 9, 1),
    (5, 5, 0),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'rating'])

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession.createDataFrame

Create a DataFrame with explicit schema

from pyspark.sql.types import *

matrix = [
    ('Alice', 0.5, 5.0),
    ('Bob',   0.2, 92.0),
    ('Tom',   0.0, 122.0),
    ('Zack',  0.1, 1.0),
]
schema = StructType([
    StructField('name', StringType(), nullable=False),
    StructField('prediction', DoubleType(), nullable=False),
    StructField('rating', DoubleType(), nullable=False)
])
df = spark.createDataFrame(matrix, schema)
df.printSchema()

new_df = spark.createDataFrame(someRDD, df.schema)

ref:
https://github.com/awesome-spark/spark-gotchas/blob/master/05_spark_sql_and_dataset_api.md

Create a nested schema

from pyspark.sql.types import ArrayType
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType

recommendation_schema = StructType([
    StructField('item', IntegerType(), nullable=False),
    StructField('rating', FloatType(), nullable=False),
])

user_recommendations_schema = StructType([
    StructField('user', IntegerType(), nullable=False),
    StructField('recommendations', ArrayType(recommendation_schema), nullable=False),
])

matrix = [
    (1, [[100, 1.0], [200, 2.0]]),
    (2, [[300, 3.0]]),
]
df = spark.createDataFrame(matrix, user_recommendations_schema)

df.printSchema()
# root
 # |-- user: integer (nullable = false)
 # |-- recommendations: array (nullable = false)
 # |    |-- element: struct (containsNull = true)
 # |    |    |-- item: integer (nullable = false)
 # |    |    |-- rating: float (nullable = false)
 # |-- recommendations_count: integer (nullable = false)

df.show()
# +----+--------------------+---------------------+
# |user|     recommendations|recommendations_count|
# +----+--------------------+---------------------+
# |   1|[[100,1.0], [200,...|                    2|
# |   2|         [[300,3.0]]|                    1|
# +----+--------------------+---------------------+

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types

Change schema of a DataFrame

from pyspark.sql.types import DoubleType, IntegerType, StructField, StructType

schema = StructType([
    StructField('user', IntegerType(), nullable=False),
    StructField('item', IntegerType(), nullable=False),
    StructField('rating', DoubleType(), nullable=False),
])
df = spark.createDataFrame(df.rdd, schema)

Get numbers of partitions

df.rdd.getNumPartitions()

Split a DataFrame into chunks (partitions)

def write_to_db(partial_rows):
    values_tuples = [(row.user, row.item, row.score, year, week) for row in chunk_rows]
    result = your_mysql_insert_func(values_tuples)
    return [result, ]

save_result = df 
    .repartition(400) 
    .rdd 
    .mapPartitions(write_to_db) 
    .collect()

ref:
https://stackoverflow.com/questions/24898700/batching-within-an-apache-spark-rdd-map
https://stackoverflow.com/questions/35370826/using-spark-for-sequential-row-by-row-processing-without-map-and-reduce

Show a DataFrame

# print the schema in a tree format
df.printSchema()

# print top 20 rows
df.show()

# print top 100 rows
df.show(100)

# calculate the descriptive statistics
df.describe().show()
df.describe(['like_count', ]).show()

ref:
http://spark.apache.org/docs/latest/sql-programming-guide.html#untyped-dataset-operations-aka-dataframe-operations

Create a column with a literal value

from pyspark.sql.functions import lit

df = df.withColumn('like', lit(1))

ref:
http://stackoverflow.com/questions/33681487/how-do-i-add-a-new-column-to-a-spark-dataframe-using-pyspark

Return a fraction of a DataFrame

fraction = {
    'u1': 0.5,
    'u2': 0.5,
    'u3': 0.5,
    'u4': 0.5,
    'u5': 0.5,
}
df.sampleBy('user', fraction).show()
# +----+----+----------+
# |user|item|prediction|
# +----+----+----------+
# |  u1|  i1|         1|
# |  u3|  i4|         4|
# |  u4|  i1|         1|
# |  u4|  i2|         3|
# |  u5|  i5|         5|
# +----+----+----------+

Show distinct values of a column

df.select('user').distinct().show()

ref:
https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/

Rename columns

df.printSchema()
# root
# |-- from_user_id: integer (nullable = true)
 # |-- repo_id: integer (nullable = true)

df = df.toDF('user', 'item')
# or
df = df.withColumnRenamed('from_user_id', 'user').withColumnRenamed('repo_id', 'item')

Convert a column to double type

df = df.withColumn('prediction', df['prediction'].cast('double'))

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.cast

Update a colume based on conditions

from pyspark.sql.functions import when

df = df.withColumn('label', when(df['prediction'] > 0.5, 1).otherwise(0))

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.when
http://stackoverflow.com/questions/34908448/spark-add-column-to-dataframe-conditionally

Drop columns from a DataFrame

predictions = predictions.dropna(subset=['prediction', S])

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropna

DataFrame subtract another DataFrame

matrix1 = [
    (1, 2, 123),
    (3, 4, 1),
    (5, 6, 45),
    (7, 8, 3424),
]
df1 = spark.createDataFrame(matrix1, ['user','item', 'play_count'])

matrix2 = [
    (3, 4),
    (5, 6),
    (9, 1),
    (7, 8),
    (1, 6),
    (1, 1),
]
df2 = spark.createDataFrame(matrix2, ['user','item'])

df2.subtract(df1.select('user', 'item')).show()
# +----+----+
# |user|item|
# +----+----+
# |   1|   1|
# |   1|   6|
# |   9|   1|
# +----+----+

# or

testing_rdd = df.rdd.subtract(training.rdd)
testing = spark.createDataFrame(testing_rdd, df.schema)

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.subtract

Convert a DataFrame column into a Python list

# list
popluar_items = [row['item'] for row in popular_df.select('item').collect()]

# set
all_items = {row.id for row in als_model.itemFactors.select('id').collect()}

Concatenate (merge) two DataFrames

full_df = df1.union(df2)

Convert a DataFrame to a Python dict

d = df.rdd.collectAsMap()
d['some_key']

Compute (approximate or exact) median of a numerical column

approximate_median = df.approxQuantile('count', [0.5, ], 0.25)
exact_median = df.approxQuantile('count', [0.5, ], 0.0)

maximum = df.approxQuantile('count', [1.0, ], 0.1)
minimum = df.approxQuantile('count', [0.0, ], 0.1)

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.approxQuantile

Find frequent items for columns

df = rating_df.freqItems(['item', ], support=0.01)
# +--------------------+
# |      item_freqItems|
# +--------------------+
# |[194512, 371798, ...|
# +--------------------+

# get the value of a DataFrame column
popular_items = df.collect()[0].item_freqItems

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.freqItems

Broadcast a value

bc_candidates = sc.broadcast(set([1, 2, 4, 5, 8]))
print(bc_candidates.value)
# {8, 1, 2, 4, 5}

bc_df = sc.broadcast(df.collect())
df = spark.createDataFrame(bc_df.value)

ref:
http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

Broadcast a DataFrame in join

import pyspark.sql.functions as F

large_df.join(F.broadcast(small_df), 'some_key')

ref:
http://stackoverflow.com/questions/34053302/pyspark-and-broadcast-join-example
https://chapeau.freevariable.com/2014/09/improving-spark-application-performance.html

Cache a DataFrame

df.cache()

ref:
http://stackoverflow.com/questions/38056774/spark-cache-vs-broadcast

Show query execution plan

df.explain(extended=True)

Use SQL to query a DataFrame

props = {'driver': 'org.sqlite.JDBC', 'date_string_format': 'yyyy-MM-dd HH:mm:ss'}
df = spark.read.jdbc("jdbc:sqlite:db.sqlite3", "app_repostarring", properties=props)
df.createOrReplaceTempView('repo_stars')

query = 'SELECT DISTINCT repo_id AS item FROM repo_stars WHERE stargazers_count > 1000'
df2 = spark.sql(query)
df2.show()

query = """
SELECT
    from_user_id AS user,
    count(repo_id) AS count
FROM repo_stars
GROUP BY from_user_id
ORDER BY count DESC
"""
df = spark.sql(query)

params = {'top_n': top_n}
query = """
SELECT 
    repo_id AS item,
    MAX(stargazers_count) AS stars
FROM repo_stars
GROUP BY repo_id
ORDER BY stars DESC
LIMIT {top_n}
""".format(**params)
popular_df = spark.sql(query)

ref:
https://sparkour.urizone.net/recipes/using-sql-udf/

WHERE ... IN ...

from pyspark.sql.functions import col

item_ids = [1, 2, 5, 8]
raw_df 
    .where(col('repo_id').isin(item_ids)) 
    .select('repo_url') 
    .collectAsMap()

ORDER BY multiple columns

import pyspark.sql.functions as F

rating_df = raw_df 
    .selectExpr('from_user_id AS user', 'repo_id AS item', '1 AS rating', 'starred_at') 
    .orderBy('user', F.col('starred_at').desc())

Aggregate

df.agg(min('user'), max('user'), min('item'), max('item')).show()

max_value = user_star_count_df.agg(F.max('stars')).collect()[0]["max('stars')"]

SELECT COUNT(DISTINCT xxx) ...

import pyspark.sql.functions as F

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (2, 1, 0),
    (3, 1, 1),
    (3, 3, 1),
    (3, 4, 1),
    (4, 1, 0),
    (4, 2, 0),
    (5, 9, 1),
    (5, 5, 0),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'rating'])
df.agg(F.countDistinct('user')).show()
# +--------------------+
# |count(DISTINCT user)|
# +--------------------+
# |                   5|
# +--------------------+

ref:
http://stackoverflow.com/questions/40888946/spark-dataframe-count-distinct-values-of-every-column

SELECT MAX(xxx) ... GROUP BY

df.groupBy('user').count().filter('count >= 4').show()

popular_df = raw_df 
    .groupBy('repo_id') 
    .agg(F.max('stargazers_count').alias('stars')) 
    .orderBy('stars', ascending=False) 
    .limit(top_n)

ref:
http://stackoverflow.com/questions/30616380/spark-how-to-count-number-of-records-by-key

SELECT COUNT() ... GROUP BY

prediction_df.groupBy('user').count().show()
# +------+-----+
# |  user|count|
# +------+-----+
# |649661|   15|
# |464340|   15|
# |468780|   15|
# |489233|   11|
# |455143|   14|
# +------+-----+

stargazers_count_df = rating_df 
    .groupBy('item') 
    .agg(F.count('user').alias('stargazers_count')) 
    .orderBy('stargazers_count', ascending=False)
# +--------+----------------+
# |    item|stargazers_count|
# +--------+----------------+
# | 3544424|             137|
# | 2126244|             120|
# | 7691631|             115|
# | 1362490|             112|
# |  943149|             112|
# +--------+----------------+

starred_count_df = rating_df 
    .groupBy('user') 
    .agg(F.count('item').alias('starred_count')) 
    .orderBy('starred_count', ascending=False)
# +-------+-------------+
# |   user|starred_count|
# +-------+-------------+
# |  48936|         7201|
# |4560482|         5898|
# |3382565|         3978|
# |  10652|         3586|
# |  31996|         3459|
# +-------+-------------+

You may want to use approx_count_distinct.

GROUP_CONCAT a column

from pyspark.sql.functions import expr

per_user_predictions_df = output_df 
  .orderBy(['user', 'prediction'], ascending=False) 
  .groupBy('user') 
  .agg(expr('collect_list(item) as items'))
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[36560369, 197450...|
# |   47217|[40693501, 643554...|
# +--------+--------------------+

ref:
http://stackoverflow.com/questions/31640729/spark-sql-replacement-for-mysql-group-concat-aggregate-function

GROUP_CONCAT multiple columns

from pyspark.sql.functions import col, collect_list, struct

matrix = [
    (1, 1, 0.1),
    (1, 2, 5.1),
    (1, 6, 0.0),
    (2, 6, 9.3),
    (3, 1, 0.54),
    (3, 5, 0.83),
    (4, 1, 0.65),
    (4, 4, 1.023),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'prediction'])

df 
    .groupBy("user") 
    .agg(collect_list(struct(col('item'), col('prediction'))).alias("recommendations")) 
    .show(truncate=False)
# +----+---------------------------+
# |user|recommendations            |
# +----+---------------------------+
# |1   |[[1,0.1], [2,5.1], [6,0.0]]|
# |3   |[[1,0.54], [5,0.83]]       |
# |2   |[[6,9.3]]                  |
# |4   |[[1,0.65], [4,1.023]]      |
# +----+---------------------------+

ref:
https://stackoverflow.com/questions/37737843/aggregating-multiple-columns-with-custom-function-in-spark

SELECT ... RANK() OVER (PARTITION BY ... ORDER BY)

from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
import pyspark.sql.functions as F

window_spec = Window.partitionBy('from_user_id').orderBy(col('starred_at').desc())
per_user_actual_items_df = raw_df 
    .select('from_user_id', 'repo_id', 'starred_at', F.rank().over(window_spec).alias('rank')) 
    .where('rank <= 10') 
    .groupBy('from_user_id') 
    .agg(expr('collect_list(repo_id) as items')) 
    .withColumnRenamed('from_user_id', 'user')
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[29122050, 634846...|
# |   59990|[9820191, 8729416...|
# +--------+--------------------+

window_spec = Window.partitionBy('user').orderBy(col('prediction').desc())
per_user_predicted_items_df = output_df 
    .select('user', 'item', 'prediction', F.rank().over(window_spec).alias('rank')) 
    .where('rank <= 10') 
    .groupBy('user') 
    .agg(expr('collect_list(item) as items'))
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[36560369, 197450...|
# |   47217|[40693501, 643554...|
# +--------+--------------------+

ref:
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
https://github.com/awesome-spark/spark-gotchas/blob/master/05_spark_sql_and_dataset_api.md#window-functions

Left anti join / Left excluding join

clean_df = rating_df.join(to_remove_items, 'item', 'left_anti')

ref:
https://www.codeproject.com/Articles/33052/Visual-Representation-of-SQL-Joins

Outer join

m1 = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 4, 1),
    (2, 2, 1),
    (2, 3, 1),
    (3, 5, 1),
]
m1df = spark.createDataFrame(m1, ['user', 'item', 'rating'])
m1df = m1df.where('user = 1').alias('m1df')

m2 = [
    (1, 100),
    (2, 200),
    (3, 300),
    (4, 400),
    (5, 500),
    (6, 600),
]
m2df = spark.createDataFrame(m2, ['item', 'count'])
m2df = m2df.alias('m2df')

m1df.join(m2df, m1df.item == m2df.item, 'rightouter') 
.where('m1df.user IS NULL') 
.orderBy('m2df.count', ascending=False) 
.selectExpr('1 AS user', 'm2df.item', '0 AS rating') 
.show()
# +----+----+------+
# |user|item|rating|
# +----+----+------+
# |   1|   6|     0|
# |   1|   5|     0|
# |   1|   3|     0|
# +----+----+------+

ref:
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-joins.html

Cross join

df = m1df.select('user').distinct().crossJoin(m2df.select('item'))
df.show()

query = """
SELECT f1.user, f2.item, 0 AS rating
FROM f1
CROSS JOIN f2
"""
df = spark.sql(query)
df.show()

all_user_item_pair_df = als_model.userFactors.selectExpr('id AS user') 
    .crossJoin(alsModel.itemFactors.selectExpr('id AS item'))
# +----+----+
# |user|item|
# +----+----+
# |xxxx|oooo|
# +----+----+

ref:
http://stackoverflow.com/questions/5464131/finding-pairs-that-do-not-exist-in-a-different-table

Spark RDD methods (Python / Scala)

Spark RDD methods (Python / Scala)

以下的 methods 有些是所有 RDD 都能用,有些則是 PairRDD 才有的功能。然後因為在不同的 projects 我先後用了 Spark 的 Python API 和 Scala API,所以以下的內容可能會混雜著兩者的範例。Scala API 要特別注意每個 method 接受和回傳的 type 的差異;Python API 就沒有這種限制了,畢竟是動態語言。

ref:
http://spark.apache.org/docs/latest/programming-guide.html#transformations
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions

RDD Methods

map()

函數 signature 為 def map[U](f: (T) ⇒ U): RDD[U]

map() 接受的 function 的輸入參數就是 RDD 的每個元素(從 DataFrame 的角度看,每個 row):func(row),return 一個任意物件(例如一個 int、一個 string 或一個 tuple)。所以 map() 之後會得到一個 row 數相同的 RDD,但是 type 可能會不一樣。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd.map(lambda row: (row[0], row[1])).collect()
# [(1, 1), (1, 2), (1, 3), (1, 6), (2, 6), (3, 1), (3, 5), (4, 1), (4, 4)]
import org.apache.spark.rdd.RDD

val rdd: RDD[(String, Int)] = sc.parallelize(Seq(
  ("Python", 1),
  ("Scala", 3),
  ("Java", 2)
))
val test: RDD[String] = rdd.map({
  case (lang, count) => {
    s"${lang}_${count}"
  }
})
test.collect().foreach(println)
// Python_1
// Scala_3
// Java_2

flatMap()

函數 signature 為 def flatMap[U](f: (T) ⇒ TraversableOnce[U]): RDD[U]

flatMap()map() 很像,接受的 function 的輸入參數也是 RDD 的每個 row:func(row),差別在於 flatMap() 只能回傳一個 Iterable 物件(例如一個 tuple 或 list,但是可以是空的),而且 flatMap() 會把 return 的結果攤平。所以 flatMap() 之後的 count() 可能會比原本的 RDD 大或小。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd.flatMap(lambda row: (row[0], row[1])).collect()
# [1, 1, 1, 2, 1, 3, 1, 6, 2, 6, 3, 1, 3, 5, 4, 1, 4, 4]

df.rdd.flatMap(lambda row: (row[2], )).collect()
# [1, 1, 1, 0, 1, 1, 1, 0, 1]

ref:
http://backtobazics.com/big-data/spark/apache-spark-flatmap-example/
http://apachesparkbook.blogspot.tw/2015/05/difference-between-map-flatmap-in-rdd.html
http://stackoverflow.com/questions/42867457/map-each-element-of-a-list-in-spark
http://stackoverflow.com/questions/21096432/list-or-iterator-of-tuples-returned-by-map-pyspark

reduce()

函數 signature 為 def reduce(f: (T, T) ⇒ T): T

reduce() 接受的 function 的輸入參數是 RDD 的兩兩元素:func(element1, element2),return 單一物件,而且是跟輸入參數同樣 type 的,最後整個 reduce() 會得到一個單一的值。

array = [
    1,
    5,
    4,
    2,
    3,
]
rdd = sc.parallelize(array)

rdd.reduce(lambda element1, element2: element1 + element2)
# 15

def max(element1, element2):
    return element1 if element1 > element2 else element2

rdd.reduce(max)
# 5

treeReduce()

函數 signature 為 def treeReduce(f: (T, T) ⇒ T, depth: Int = 2): T

普通的 reduce() 會直接把每一個 partition 的 reduce 結果送回 driver machine 做最後運算,當 partition 很多和每個 partition 的資料量很大的時候可能會是一個瓶頸。這時候你可以改用 treeReduce(),不過使用不當的話反而會有反效果。類似的關係還有 aggregate()treeAggregate()

ref:
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/treereduce_and_treeaggregate_demystified.html

aggregate()

函數 signature 為 def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U

import org.apache.spark.rdd.RDD

case class WikipediaArticle(title: String, text: String) {
  def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}

val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
  WikipediaArticle("a1", "abc Scala xyz"),
  WikipediaArticle("a2", "abc Python xyz"),
  WikipediaArticle("a3", "abc Scala xyz"),
  WikipediaArticle("a4", "abc Scala xyz")
))

def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int = {
  val accumulateOp = (total: Int, a: WikipediaArticle) => total + 1
  val combineOp = (count1: Int, count2: Int) => count1 + count2
  rdd.filter(_.mentionsLanguage(lang)).aggregate(0)(accumulateOp, combineOp)
}

occurrencesOfLang("Scala", wikiRdd)
// 3

ref:
https://stackoverflow.com/questions/28240706/explain-the-aggregate-functionality-in-spark

PairRDD Methods

PairRDD 就是包含 key/value pair 的 RDD,長這樣:RDD[(K, V)]KV 除了可能是基本的 type 之外,也可能是其他 object 或 collection。PairRDDFunctions 回傳的也都是 RDD[(K, V)]

ref:
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

mapValues()

函數 signature 為 def mapValues[U](f: (V) ⇒ U): RDD[(K, U)]

mapValues() 的 function 接受的唯一一個參數就是 PairRDDV,它只處理 values。

val rdd = sc.parallelize(Seq(
  ("Python", 1),
  ("Scala", 3),
  ("Java", 2)
))

rdd
  .mapValues((count: Int) => {
    count * 10
  })
  .collect()
// Array((Python,10), (Scala,30), (Java,20))

ref:
http://apachesparkbook.blogspot.tw/2015/12/mapvalues-example.html

groupByKey()

函數 signature 為 def groupByKey(): RDD[(K, Iterable[V])]

以下這些情況應該避免使用 groupByKey()

  • If operataion is expressed using groupByKey followed by associative and commutative reducing operation on values (sum, count, max / min) it should be replaced by reduceByKey.
  • If operation can be expressed using a comination of local sequence operation and merge operation (online variance / mean, top-n observations) it should be expressed with combineByKey or aggregateByKey.
  • If final goal is to traverse values in a specific order (groupByKey followed by sorting values followed by iteration) it can be typically rewritten as repartitionAndSortWithinPartitions with custom partitioner and ordering followed by mapPartitions.
case class WikipediaArticle(title: String, text: String) {
  def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}

val langs = List(
  "Java", "PHP", "Python", "Scala"
)

val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
  WikipediaArticle("a1", "a1 Scala"),
  WikipediaArticle("a2", "a2 Python"),
  WikipediaArticle("a3", "a3 Scala"),
  WikipediaArticle("a4", "a4 Java"),
  WikipediaArticle("a5", "a5 Java"),
  WikipediaArticle("a6", "a6 Scala")
))

def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = {
  rdd
    .flatMap((article: WikipediaArticle) => {
      langs
        .filter((lang: String) => {
          article.mentionsLanguage(lang)
        })
        .map((lang: String) => {
          (lang, article)
        })
    })
    .groupByKey()
}

def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd)
// Array(
//   (Java,CompactBuffer(WikipediaArticle(a4,a4 Java), WikipediaArticle(a5,a5 Java))),
//   (Python,CompactBuffer(WikipediaArticle(a2,a2 Python))),
//   (Scala,CompactBuffer(WikipediaArticle(a1,a1 Scala), WikipediaArticle(a3,a3 Scala), WikipediaArticle(a6,a6 Scala)))
// )

def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] = {
  index
    .map({
      case (lang, articles) => {
        (lang, articles.size)
      }
    })
    .sortBy(-_._2)
    .collect()
    .toList
}

rankLangsUsingIndex(index)
// List((Scala,3),(Java,2),(Python,1),(PHP,0))

Spark Best Practices
https://github.com/beeva/beeva-best-practices/blob/master/big_data/spark/README.md

RDD actions and Transformations by Example
https://github.com/awesome-spark/spark-gotchas/blob/master/04_rdd_actions_and_transformations_by_example.md

Avoid groupByKey when performing an associative reductive operation
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/avoid_groupbykey_when_performing_an_associative_re.html

reduceByKey()

函數 signature 為 def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

reduceByKey() 接受的兩個參數的類型是由 PairRDDV 決定的。做到同樣的功能,reduceByKey() 的執行效率比 groupByKey() + reduce() 好很多。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd 
    .map(lambda row: (row[0], row[1])) 
    .reduceByKey(lambda x, y: x + y) 
    .collect()
# map() => [(1, 1), (1, 2), (1, 3), (1, 6), (2, 6), (3, 1), (3, 5), (4, 1), (4, 4)]
# reduceByKey() => [(1, 12), (2, 6), (3, 6), (4, 5)]
matrix = [
    (1, [[100, 1.0], [200, 2.0]]),
    (2, [[300, 3.0]]),
    (2, [[400, 4.0], [600, 6.0]]),
    (2, [[500, 5.0]]),
]
df = spark.createDataFrame(matrix, ['user', 'recommendations'])

def merge_recommendations(recommendations1, recommendations2):
    return recommendations1 + recommendations2

def slice_recommendations(row, candidate_k):
    user, recommendations = row
    sliced_recommendations = sorted(recommendations, key=lambda recommendation: recommendation.rating, reverse=True)[:candidate_k + 5]
    return (user, sliced_recommendations)

full_rdd = df 
    .rdd 
    .reduceByKey(lambda x, y: merge_recommendations(x, y)) 
    .map(lambda row: slice_recommendations(row, candidate_k))
final_df = spark.createDataFrame(full_rdd, ['user', 'recommendations'])
val langs = List(
  "Java", "PHP", "Python", "Scala"
)

val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
  WikipediaArticle("a1", "a1 Scala"),
  WikipediaArticle("a2", "a2 Python"),
  WikipediaArticle("a3", "a3 Scala"),
  WikipediaArticle("a4", "a4 Java"),
  WikipediaArticle("a5", "a5 Java"),
  WikipediaArticle("a6", "a6 Scala")
))

def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = {
  rdd
    .flatMap((article: WikipediaArticle) => {
      langs
        .filter((lang: String) => {
          article.mentionsLanguage(lang)
        })
        .map((lang: String) => {
          (lang, 1)
        })
    })
    .reduceByKey((count1: Int, count2: Int) => {
      count1 + count2
    })
    .collect()
    .toList
    .sortWith(_._2 > _._2)
}

rankLangsReduceByKey(langs, wikiRdd)
// List((Scala,3),(Java,2),(Python,1),(PHP,0))

Avoid reduceByKey when the input and output value types are different
http://backtobazics.com/big-data/spark/apache-spark-reducebykey-example/

Reduce a key-value pair into a key-list pair
http://stackoverflow.com/questions/27002161/reduce-a-key-value-pair-into-a-key-list-pair-with-apache-spark

foldByKey()

函數 signature 為 def foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]

foldByKey() 基本上就是可以手動指定 zero value 的 reduceByKey()

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd 
    .map(lambda row: (row[0], [row[1], ])) 
    .foldByKey(list(), add) 
    .collect()
# [(4, [1, 4]), (1, [1, 2, 3, 6]), (2, [6]), (3, [1, 5])]

aggregateByKey()

函數 signature 為 def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U): RDD[(K, U)]

適合用來取代 .map(lambda row: (row['user'], [row['item'], ])).reduceByKey(lambda v1, v2: v1 + v2) 這樣的套路。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 4, 1),
    (1, 5, 1),
    (2, 2, 1),
    (2, 2, 1),
    (2, 3, 1),
    (3, 5, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

def seqFunc(item_set, item):
    item_set.add(item)
    return item_set

def combFunc(item_set1, item_set2):
    return item_set1.union(item_set2)

df.select('user', 'item').rdd 
    .aggregateByKey(set(), seqFunc, combFunc) 
    .collect()
# [(1, {1, 2, 4, 5}), (2, {2, 3}), (3, {5})]

ref:
http://codingjunkie.net/spark-agr-by-key/
https://stackoverflow.com/questions/31081563/apache-spark-what-is-the-equivalent-implementation-of-rdd-groupbykey-using-rd
https://stackoverflow.com/questions/24804619/how-does-spark-aggregate-function-aggregatebykey-work

combineByKey()

函數 signature 為 def combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

用來把 RDD[(K, V)] 轉變成 RDD[(K, C)]C 可以是任意的 type。

combineByKey() 接受三個 functions:

  • createCombiner, which turns a V into a C (e.g., creates a one-element list)
  • mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
  • mergeCombiners, to combine two C’s into a single one.
import pyspark.sql.functions as F

matrix = [
    ('chinese', 80),
    ('math', 60),
    ('english', 100),
    ('chinese', 90),
    ('math', 100),
    ('math', 10),
    ('english', 70),
    ('english', 20),
    ('history', 30),
]
df = spark.createDataFrame(matrix, ['subject', 'score'])

def createCombiner(score):
    return (score, 1)

def mergeValue(accumulate, score):
    total_score = accumulate[0] + score
    total_count = accumulate[1] + 1
    return (total_score, total_count)

def mergeCombiners(accumulate1, accumulate2):
    total_score = accumulate1[0] + accumulate2[0]
    total_count = accumulate1[1] + accumulate2[1]
    return (total_score, total_count)

df.rdd.combineByKey(createCombiner, mergeValue, mergeCombiners).collect()
# you could calculate the average score of every subject
# [('chinese', (170, 2)),
 # ('history', (30, 1)),
 # ('math', (170, 3)),
 # ('english', (190, 3))]

ref:
https://zhangyi.gitbooks.io/spark-in-action/content/chapter2/rdd.html

cogroup()

如果你要 join 兩個已經被 groupBy 的 RDD,可以使用 cogroup()。避免使用 flatMap + join + groupBy 這樣的套路。

Setup Spark on macOS

Setup Spark on macOS

Install

First, you need Java 8 JDK.
http://www.oracle.com/technetwork/java/javase/downloads/index.html

$ java -version
java version "1.8.0_131"

Homebrew Version

$ brew update
$ brew install maven apache-spark

Pre-built Version

$ mkdir -p /usr/local/share/apache-spark && 
  cd /usr/local/share/apache-spark && 
  wget https://www.apache.org/dyn/closer.lua/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz && 
  tar -xvzf spark-2.2.0-bin-hadoop2.7.tgz

ref:
http://spark.apache.org/downloads.html

Build Version

This is the recommended way.

$ brew install [email protected]
$ export PATH="/usr/local/opt/[email protected]/bin:$PATH"
$ scala -version
Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL

$ mkdir -p /usr/local/share/apache-spark && 
  cd /usr/local/share/apache-spark && 
  wget https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0.tgz && 
  tar -xvzf spark-2.2.0.tgz && 
  cd spark-2.2.0

$ ./build/mvn -Pnetlib-lgpl -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.3 -DskipTests -T 4C package
# or
$ ./build/mvn -Pnetlib-lgpl -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.3 -DskipTests clean package
$ spark-shell --packages "com.github.fommil.netlib:all:1.1.2"
scala> import com.github.fommil.netlib.BLAS
import com.github.fommil.netlib.BLAS
scala> BLAS.getInstance().getClass().getName()
res1: String = com.github.fommil.netlib.NativeSystemBLAS

ref:
http://spark.apache.org/downloads.html
http://spark.apache.org/docs/latest/building-spark.html
http://spark.apache.org/docs/latest/ml-guide.html#dependencies

Configurations

in .zshrc

if which java > /dev/null; then
  export JAVA_HOME="$(/usr/libexec/java_home -v 1.8)"
  export PATH="$JAVA_HOME/bin:$PATH"
fi

export PATH="/usr/local/opt/[email protected]/bin:$PATH"

# homebrew version
export SPARK_HOME="/usr/local/Cellar/apache-spark/2.2.0/libexec"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"
export PYSPARK_DRIVER_PYTHON="ipython"

# pre-built version
export SPARK_HOME="/usr/local/share/apache-spark/spark-2.2.0-bin-hadoop2.7"
export PATH="$SPARK_HOME/bin:$PATH"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"

# build version
export SPARK_HOME="/usr/local/share/apache-spark/spark-2.2.0"
export PATH="$SPARK_HOME/bin:$PATH"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"

ref:
https://spark.apache.org/docs/latest/programming-guide.html
https://spark.apache.org/docs/latest/configuration.html

$ cd $SPARK_HOME

$ cp conf/spark-defaults.conf.template conf/spark-defaults.conf
spark.driver.memory              4g
spark.executor.memory            4g
spark.jars.packages              com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41
spark.serializer                 org.apache.spark.serializer.KryoSerializer

$ cp conf/spark-env.sh.template conf/spark-env.sh
export PYTHONHASHSEED=42

$ cp conf/log4j.properties.template conf/log4j.properties

ref:
https://spark.apache.org/docs/latest/configuration.html

Commands

Local Mode

$ spark-shell

$ export PYSPARK_DRIVER_PYTHON="jupyter" && 
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip 0.0.0.0" && 
pyspark 
--packages "com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41" 
--driver-memory 4g 
--executor-memory 4g 
--master "local[*]"

$ spark-shell 
--packages "com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41"
--master "local-cluster[3, 1, 4096]"

# Spark Application UI on the driver
$ open http://localhost:4040/

ref:
https://spark.apache.org/docs/latest/programming-guide.html

Standalone mode

There are two deploy modes for Spark Standalone. In client mode, the driver is launched in the same process as the client that submits the application. In cluster mode, however, the driver is launched from one of the Worker.

$ ./sbin/start-master.sh -h localhost
$ ./sbin/start-slave.sh spark://localhost:7077

# Spark Web UI on the cluster manager
$ open http://localhost:8080/

$ pyspark 
--driver-memory 4g 
--executor-memory 4g 
--master spark://localhost:7077

$ spark-submit 
--master spark://localhost:7077 
examples/src/main/python/pi.py 10

$ spark-submit 
--driver-memory 2g 
--driver-java-options "-XX:ThreadStackSize=81920" 
--total-executor-cores 3 
--executor-cores 3 
--executor-memory 12g 
--conf "spark.executor.extraJavaOptions=-XX:ThreadStackSize=81920" 
--master spark://localhost:7077 
--packages "mysql:mysql-connector-java:5.1.41,com.hankcs:hanlp:portable-1.3.4,edu.stanford.nlp:stanford-corenlp:3.7.0" 
--jars "/Users/vinta/Projects/albedo/spark-data/stanford-corenlp-3.8.0-models.jar" 
--class ws.vinta.albedo.LogisticRegressionRanker 
target/albedo-1.0.0-SNAPSHOT.jar

# Spark Application UI on the driver
$ open http://localhost:4040/

ref:
https://spark.apache.org/docs/latest/spark-standalone.html
https://spark.apache.org/docs/latest/submitting-applications.html
https://spark.apache.org/docs/latest/configuration.html

Recommender System: Matrix Factorization

Recommender System: Matrix Factorization

Singular Value Decomposition (SVD) 奇異值分解

        item_a  item_b  item_c
user_1  2       -       -
user_2  -       1       -
user_3  3       3       -
user_4  -       2       2

對 m x n 的評分矩陣做矩陣分解,矩陣分解常用的方法之一是 SVD 奇異值分解,把評分矩陣分解成三個矩陣 U S V*。

SVD 在推荐系统中的应用
http://yanyiwu.com/work/2012/09/10/SVD-application-in-recsys.html

奇异值分解 (SVD) 原理与在降维中的应用
http://www.cnblogs.com/pinard/p/6251584.html

Alternating Least Squares (ALS) 交替最小二乘法

因為 SVD 要求矩陣是稠密的,推薦系統中的評分矩陣通常是一個很大的稀疏矩陣,所以實務上都會使用 SVD 的變種:Funk-SVD,Funk-SVD 也稱為 Latent Factor Model (LFM) 隱語意模型:把 m x n 的評分矩陣 R 分解成 U I 兩個矩陣,分別是 m x k 的 users 矩陣和 k x n 的 items 矩陣,各自都有 k 維的特徵,計算時只考慮評分不為 0 的項目。

ALS 就是求出 U I 矩陣的一種求解方法。其他的方式還有 Stochastic Gradient Descent (SGD)。

ALS 在 Spark MLlib 中的实现
http://www.csdn.net/article/2015-05-07/2824641

矩阵分解在协同过滤推荐算法中的应用
http://www.cnblogs.com/pinard/p/6351319.html

基于矩阵分解的隐因子模型
http://www.voidcn.com/blog/winone361/article/p-5031282.html

ALS for implicit feedbacks

這種模型也稱為 Weighted Regularized Matrix Factorization (WRMF)。

        item_a  item_b  item_c
user_1  1       -       -
user_2  -       1       -
user_3  1       1       -
user_4  -       1       1

符號說明:

  • Rui = User u's interaction on Item i
  • Xu = User u's vector
  • Yi = Item i's vector
  • Pui = 1 if Rui > 0 else 0
  • Cui = 1 + alpha x Rui

參數說明:

  • iteration 迭代次數
  • rank 隱含特徵數
  • lambda 正則化參數
  • alpha 置信度權重

跟 Explicit ALS 一樣,把 m x n 的隱式反饋矩陣 R 分解成 X Y 兩個矩陣,分別是 m x k 的 user latent factors 矩陣和 k x n 的 item latent factors 矩陣。不同的是,還額外引入了兩個矩陣:m x n 的 P 矩陣,binary preference,Pui 表示用戶是不是對該物品感興趣;m x n 的 C 矩陣,confidence 置信度(或者想成是 strength 強度),Cui 表示有多確定用戶對該物品感興趣。整個演算法的目標仍舊是計算出 users 和 items 矩陣,跟 Explicit ALS 的差別在於 loss function 多加入了 Cui 和 Pui,而且會考慮所有的 user-item pair,包含 missing value / non-observed value(Rui = 0)。

只要用戶對物品有過行為(Rui > 0),我們就假設用戶對該物品有偏好(Pui = 1),用戶對該物品的行為越多(Rui 越大),Cui 就會越大,表示置信度越高,假設越可信;如果用戶對該物品沒有行為(Rui = 0),就假設用戶對該物品沒有偏好(Pui = 0),但是因為 Cui = 1 + alpha x 0,所以這個假設相對來說置信度較低,比較不可信。

Explicit ALS 是要預測用戶對某個物品的評分;Implicit ALS 則是要預測用戶會不會對某個物品感興趣:Pui = Xu x Yi.T,也就是說 Implicit ALS 輸出的 prediction 其實是用 X x Y.T 所重建出來的 P 矩陣。這些 prediction 的值基本上會落在 [0, 1] 之間,但是也可能大於 1 或是小於 0,值越大表示用戶對該物品有越強烈的偏好,越小則反之。

因為 ALS 屬於 collaborative filtering,所以也有 cold start 的問題,無法對新用戶和新物品做出推薦。除此之外,ALS 的缺點是缺乏解釋性而且難以加入 side information(user 和 item 本身的 features,例如性別、曲風、類型之類的)。不過因為 matrix factorization 其實隱含了 clustering 的功能,相對於其他方式來說,具有抗雜訊的能力。

Collaborative Filtering for Implicit Feedback Datasets
http://yifanhu.net/PUB/cf.pdf

A Gentle Introduction to Recommender Systems with Implicit Feedback
https://jessesw.com/Rec-System/

Intro to Implicit Matrix Factorization: Classic ALS
http://blog.ethanrosenthal.com/2016/10/19/implicit-mf-part-1/

Spark ML 算法原理剖析
https://github.com/endymecy/spark-ml-source-analysis

Calculate the similarity of two vectors

Calculate the similarity of two vectors

scipy.spatial.distance
https://docs.scipy.org/doc/scipy/reference/spatial.distance.html

sklearn.metrics
http://scikit-learn.org/stable/modules/classes.html#module-sklearn.metrics

Distance

Euclidean distance 歐幾里德距離

from sklearn.metrics.pairwise import euclidean_distances

euclidean_distances([0, 0, 0, 0], [0, 0, 0, 0])
# array([[ 0.]])

euclidean_distances([1, 0, 1, 0], [1, 0, 1, 0])
# array([[ 0.]])

euclidean_distances([0, 1, 0, 1], [1, 0, 1, 0])
# array([[ 2.]])

ref:
http://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.euclidean_distances.html

Manhattan Distance 曼哈頓距離

from sklearn.metrics.pairwise import manhattan_distances

manhattan_distances([0, 0, 0, 0], [0, 0 , 0, 0])
# array([[ 0.]])

manhattan_distances([1, 1, 1, 0], [1, 0, 0, 0])
# array([[ 2.]])

manhattan_distances([0, 1, 0, 1], [1, 0, 1, 0])
# array([[ 4.]])

ref:
http://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.manhattan_distances.html

Similarity

Cosine similarity 餘弦相似度

from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics.pairwise import cosine_distances
from sklearn.metrics.pairwise import pairwise_distances
from scipy.spatial.distance import pdist, squareform

cosine_similarity(matrix) == 
1 - cosine_distances(matrix) == 
1 - pairwise_distances(matrix, metric='cosine') == 
1 - squareform(pdist(matrix, 'cosine'))

cosine_similarity([0, 0, 0, 0], [0, 0, 0, 0])
# array([[ 0.]])

cosine_similarity([1, 0, 0, 0], [1, 0, 0, 0])
# array([[ 1.]])

cosine_similarity([1, 0, 1, 0], [0, 1, 0, 1])
# array([[ 0.]])

cosine_similarity([1, 0, 0, 1], [1, 0, 0, 0])
# array([[ 0.70710678]])

cosine_similarity([1, 0, 0, 1], [1, 0, 1, 0])
# array([[ 0.5]])

ref:
http://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.cosine_similarity.html
http://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.cosine_distances.html
http://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.pairwise_distances.html
https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.pdist.html

Jaccard similarity coefficient score

from sklearn.metrics import jaccard_similarity_score

jaccard_similarity_score([0, 0, 0, 0], [0, 0, 0, 0])
# 1.0

jaccard_similarity_score([0, 0, 0, 0], [1, 0, 0, 0])
# 0.75

jaccard_similarity_score([1, 0, 0, 0], [1, 0, 0, 0])
# 1.0

jaccard_similarity_score([1, 0, 1, 0], [0, 1, 0, 1])
# 0.0

ref:
http://scikit-learn.org/stable/modules/generated/sklearn.metrics.jaccard_similarity_score.html

http://datascience.stackexchange.com/questions/5121/applications-and-differences-for-jaccard-similarity-and-cosine-similarity

Log-Likelihood similarity

TODO

Pearson correlation coefficient 皮爾森相關係數

It has a value between +1 and −1 inclusive, where 1 is total positive linear correlation, 0 is no linear correlation, and −1 is total negative linear correlation. You should only calculate Pearson Correlations when the number of items in common between two users is > 1, preferably greater than 5/10. Only calculate the Pearson Correlation for two users where they have commonly rated items.

For hign-dimensional binary attributes, the performances of Pearson correlation coefficient and Cosine similarity
are better than Jaccard similarity coefficient score.

from scipy.stats import pearsonr

pearsonr([1, 0, 1, 1], [0, 0, 0, 0])
# (nan, 1.0)

pearsonr([1, 0, 1, 1], [1, 0, 0, 0])
# (0.33333333333333331, 0.66666666666666607)

pearsonr([1, 0, 1, 0], [0, 1, 0, 1])
# (-1.0, 0.0)

ref:
https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.pearsonr.html
http://stackoverflow.com/questions/11429604/how-is-nan-handled-in-pearson-correlation-user-user-similarity-matrix-in-a-recom

Dissimilarity

Dice dissimilarity

from scipy.spatial.distance import dice
import numpy as np

v1 = np.array([0, 0, 0, 0])
v2 = np.array([0, 0, 0, 0])

try:
    sim = 1.0 - dice(v1.astype(bool), v2.astype(bool))
except ZeroDivisionError:
    sim = 0

ref:
https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.dice.html
https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.kulsinski.html
https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.sokalsneath.html
https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.yule.html