Spark SQL cookbook (Python)

Spark SQL cookbook (Python)

Access SparkSession

from pyspark.sql import SparkSession

# get the default SparkSession instance
spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext
sc.setLogLevel('INFO')

Access custom configurations

$ spark-submit \
--master spark://localhost:7077 \
--properties-file my-properties.conf \
your_spark_app.py

# in my-properties.conf
# spark.myapp.db_host 192.168.100.10
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
print(sc.getConf().get('spark.myapp.db_host'))

ref:
https://stackoverflow.com/questions/31115881/how-to-load-java-properties-file-and-use-in-spark

Create a RDD

array = [
    1,
    5,
    7,
    2,
    3,
]
rdd = sc.parallelize(array)
df = rdd.toDF('int')
# +-----+
# |value|
# +-----+
# |    1|
# |    5|
# |    7|
# |    2|
# |    3|
# +-----+

Read data from MySQL

$ pyspark --packages "mysql:mysql-connector-java:5.1.41"
url = 'jdbc:mysql://127.0.0.1:3306/albedo'
properties = {'user': 'root', 'password': '123'}
spark.read.jdbc(url, 'app_repostarring', properties=properties)

ref:
https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-configuration-properties.html

Write data to MySQL

CREATE TABLE `recommender_songrecommend` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) NOT NULL,
  `song_id` int(11) NOT NULL,
  `score` double NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1;
matrix = [
    (1, 2, 0.1),
    (3, 4, 0.23456),
    (5, 6, 7.89),
    (7, 8, -10.111213),
]
df = spark.createDataFrame(matrix, ['user_id','song_id', 'score'])

url = 'jdbc:mysql://192.168.11.200:3306/DATABASE?user=USER&password=PASSWORD&verifyServerCertificate=false&useSSL=false&rewriteBatchedStatements=true'
properties = {'driver': 'com.mysql.jdbc.Driver'}
df \
    .selectExpr('user AS user_id', 'item AS song_id', 'score') \
    .write.jdbc(url, table='recommender_songrecommend', mode='append', properties=properties)

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.jdbc
https://stackoverflow.com/questions/2993251/jdbc-batch-insert-performance/10617768#10617768

Read data from SQLite

$ pyspark --packages "org.xerial:sqlite-jdbc:3.16.1"
from pyspark.sql.functions import lit
from pyspark.sql.types import IntegerType, StructField, StructType, TimestampType

props = {'driver': 'org.sqlite.JDBC', 'date_string_format': 'yyyy-MM-dd HH:mm:ss'}
df = spark.read.jdbc("jdbc:sqlite:db.sqlite3", "app_repostarring", properties=props)

df = df.where(df.stargazers_count >= min_stargazers_count)
df = df.select('from_user_id', 'repo_id', 'created_at')
df = df.toDF('user', 'item') 
df = df.withColumn('rating', lit(1))

schema = StructType([
    StructField('user', IntegerType(), nullable=False),
    StructField('item', IntegerType(), nullable=False),
    StructField('rating', IntegerType(), nullable=False),
    StructField('item_created_at', TimestampType(), nullable=False),
])
df = spark.createDataFrame(df.rdd, schema)

ref:
https://github.com/xerial/sqlite-jdbc

java.text.ParseException: Unparseable date: "2016-04-22 17:26:54"
https://github.com/xerial/sqlite-jdbc/issues/88

Read data from parquet

from pyspark.sql.utils import AnalysisException

raw_df_filepath = 'raw_df.parquet'

try:
    raw_df = spark.read.format('parquet').load(raw_df_filepath)
except AnalysisException as exc:
    if 'Path does not exist' in exc.desc:
        raw_df = load_raw_data()
        raw_df.write.format('parquet').save(raw_df_filepath)
    else:
        raise exc

ref:
https://community.hortonworks.com/articles/21303/write-read-parquet-file-in-spark.html

Create a DataFrame

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (2, 1, 0),
    (3, 1, 1),
    (3, 3, 1),
    (3, 4, 1),
    (4, 1, 0),
    (4, 2, 0),
    (5, 9, 1),
    (5, 5, 0),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'rating'])

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession.createDataFrame

Create a DataFrame with explicit schema

from pyspark.sql.types import *

matrix = [
    ('Alice', 0.5, 5.0),
    ('Bob',   0.2, 92.0),
    ('Tom',   0.0, 122.0),
    ('Zack',  0.1, 1.0),
]
schema = StructType([
    StructField('name', StringType(), nullable=False),
    StructField('prediction', DoubleType(), nullable=False),
    StructField('rating', DoubleType(), nullable=False)
])
df = spark.createDataFrame(matrix, schema)
df.printSchema()

new_df = spark.createDataFrame(someRDD, df.schema)

ref:
https://github.com/awesome-spark/spark-gotchas/blob/master/05_spark_sql_and_dataset_api.md

Create a nested schema

from pyspark.sql.types import ArrayType
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType

recommendation_schema = StructType([
    StructField('item', IntegerType(), nullable=False),
    StructField('rating', FloatType(), nullable=False),
])

user_recommendations_schema = StructType([
    StructField('user', IntegerType(), nullable=False),
    StructField('recommendations', ArrayType(recommendation_schema), nullable=False),
])

matrix = [
    (1, [[100, 1.0], [200, 2.0]]),
    (2, [[300, 3.0]]),
]
df = spark.createDataFrame(matrix, user_recommendations_schema)

df.printSchema()
# root
 # |-- user: integer (nullable = false)
 # |-- recommendations: array (nullable = false)
 # |    |-- element: struct (containsNull = true)
 # |    |    |-- item: integer (nullable = false)
 # |    |    |-- rating: float (nullable = false)
 # |-- recommendations_count: integer (nullable = false)

df.show()
# +----+--------------------+---------------------+
# |user|     recommendations|recommendations_count|
# +----+--------------------+---------------------+
# |   1|[[100,1.0], [200,...|                    2|
# |   2|         [[300,3.0]]|                    1|
# +----+--------------------+---------------------+

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types

Change schema of a DataFrame

from pyspark.sql.types import DoubleType, IntegerType, StructField, StructType

schema = StructType([
    StructField('user', IntegerType(), nullable=False),
    StructField('item', IntegerType(), nullable=False),
    StructField('rating', DoubleType(), nullable=False),
])
df = spark.createDataFrame(df.rdd, schema)

Get numbers of partitions

df.rdd.getNumPartitions()

Split a DataFrame into chunks (partitions)

def write_to_db(partial_rows):
    values_tuples = [(row.user, row.item, row.score, year, week) for row in chunk_rows]
    result = your_mysql_insert_func(values_tuples)
    return [result, ]

save_result = df \
    .repartition(400) \
    .rdd \
    .mapPartitions(write_to_db) \
    .collect()

ref:
https://stackoverflow.com/questions/24898700/batching-within-an-apache-spark-rdd-map
https://stackoverflow.com/questions/35370826/using-spark-for-sequential-row-by-row-processing-without-map-and-reduce

Show a DataFrame

# print the schema in a tree format
df.printSchema()

# print top 20 rows
df.show()

# print top 100 rows
df.show(100)

# calculate the descriptive statistics
df.describe().show()
df.describe(['like_count', ]).show()

ref:
http://spark.apache.org/docs/latest/sql-programming-guide.html#untyped-dataset-operations-aka-dataframe-operations

Create a column with a literal value

from pyspark.sql.functions import lit

df = df.withColumn('like', lit(1))

ref:
http://stackoverflow.com/questions/33681487/how-do-i-add-a-new-column-to-a-spark-dataframe-using-pyspark

Return a fraction of a DataFrame

fraction = {
    'u1': 0.5,
    'u2': 0.5,
    'u3': 0.5,
    'u4': 0.5,
    'u5': 0.5,
}
df.sampleBy('user', fraction).show()
# +----+----+----------+
# |user|item|prediction|
# +----+----+----------+
# |  u1|  i1|         1|
# |  u3|  i4|         4|
# |  u4|  i1|         1|
# |  u4|  i2|         3|
# |  u5|  i5|         5|
# +----+----+----------+

Show distinct values of a column

df.select('user').distinct().show()

ref:
https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/

Rename columns

df.printSchema()
# root
# |-- from_user_id: integer (nullable = true)
 # |-- repo_id: integer (nullable = true)

df = df.toDF('user', 'item')
# or
df = df.withColumnRenamed('from_user_id', 'user').withColumnRenamed('repo_id', 'item')

Convert a column to double type

df = df.withColumn('prediction', df['prediction'].cast('double'))

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.cast

Update a colume based on conditions

from pyspark.sql.functions import when

df = df.withColumn('label', when(df['prediction'] > 0.5, 1).otherwise(0))

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.when
http://stackoverflow.com/questions/34908448/spark-add-column-to-dataframe-conditionally

Drop columns from a DataFrame

predictions = predictions.dropna(subset=['prediction', S])

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropna

DataFrame subtract another DataFrame

matrix1 = [
    (1, 2, 123),
    (3, 4, 1),
    (5, 6, 45),
    (7, 8, 3424),
]
df1 = spark.createDataFrame(matrix1, ['user','item', 'play_count'])

matrix2 = [
    (3, 4),
    (5, 6),
    (9, 1),
    (7, 8),
    (1, 6),
    (1, 1),
]
df2 = spark.createDataFrame(matrix2, ['user','item'])

df2.subtract(df1.select('user', 'item')).show()
# +----+----+
# |user|item|
# +----+----+
# |   1|   1|
# |   1|   6|
# |   9|   1|
# +----+----+

# or

testing_rdd = df.rdd.subtract(training.rdd)
testing = spark.createDataFrame(testing_rdd, df.schema)

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.subtract

Convert a DataFrame column into a Python list

# list
popluar_items = [row['item'] for row in popular_df.select('item').collect()]

# set
all_items = {row.id for row in als_model.itemFactors.select('id').collect()}

Concatenate (merge) two DataFrames

full_df = df1.union(df2)

Convert a DataFrame to a Python dict

d = df.rdd.collectAsMap()
d['some_key']

Compute (approximate or exact) median of a numerical column

approximate_median = df.approxQuantile('count', [0.5, ], 0.25)
exact_median = df.approxQuantile('count', [0.5, ], 0.0)

maximum = df.approxQuantile('count', [1.0, ], 0.1)
minimum = df.approxQuantile('count', [0.0, ], 0.1)

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.approxQuantile

Find frequent items for columns

df = rating_df.freqItems(['item', ], support=0.01)
# +--------------------+
# |      item_freqItems|
# +--------------------+
# |[194512, 371798, ...|
# +--------------------+

# get the value of a DataFrame column
popular_items = df.collect()[0].item_freqItems

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.freqItems

Broadcast a value

bc_candidates = sc.broadcast(set([1, 2, 4, 5, 8]))
print(bc_candidates.value)
# {8, 1, 2, 4, 5}

bc_df = sc.broadcast(df.collect())
df = spark.createDataFrame(bc_df.value)

ref:
http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

Broadcast a DataFrame in join

import pyspark.sql.functions as F

large_df.join(F.broadcast(small_df), 'some_key')

ref:
http://stackoverflow.com/questions/34053302/pyspark-and-broadcast-join-example
https://chapeau.freevariable.com/2014/09/improving-spark-application-performance.html

Cache a DataFrame

df.cache()

ref:
http://stackoverflow.com/questions/38056774/spark-cache-vs-broadcast

Show query execution plan

df.explain(extended=True)

Use SQL to query a DataFrame

props = {'driver': 'org.sqlite.JDBC', 'date_string_format': 'yyyy-MM-dd HH:mm:ss'}
df = spark.read.jdbc("jdbc:sqlite:db.sqlite3", "app_repostarring", properties=props)
df.createOrReplaceTempView('repo_stars')

query = 'SELECT DISTINCT repo_id AS item FROM repo_stars WHERE stargazers_count > 1000'
df2 = spark.sql(query)
df2.show()

query = """
SELECT
    from_user_id AS user,
    count(repo_id) AS count
FROM repo_stars
GROUP BY from_user_id
ORDER BY count DESC
"""
df = spark.sql(query)

params = {'top_n': top_n}
query = """
SELECT 
    repo_id AS item,
    MAX(stargazers_count) AS stars
FROM repo_stars
GROUP BY repo_id
ORDER BY stars DESC
LIMIT {top_n}
""".format(**params)
popular_df = spark.sql(query)

ref:
https://sparkour.urizone.net/recipes/using-sql-udf/

WHERE ... IN ...

from pyspark.sql.functions import col

item_ids = [1, 2, 5, 8]
raw_df \
    .where(col('repo_id').isin(item_ids)) \
    .select('repo_url') \
    .collectAsMap()

ORDER BY multiple columns

import pyspark.sql.functions as F

rating_df = raw_df \
    .selectExpr('from_user_id AS user', 'repo_id AS item', '1 AS rating', 'starred_at') \
    .orderBy('user', F.col('starred_at').desc())

Aggregate

df.agg(min('user'), max('user'), min('item'), max('item')).show()

max_value = user_star_count_df.agg(F.max('stars')).collect()[0]["max('stars')"]

SELECT COUNT(DISTINCT xxx) ...

import pyspark.sql.functions as F

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (2, 1, 0),
    (3, 1, 1),
    (3, 3, 1),
    (3, 4, 1),
    (4, 1, 0),
    (4, 2, 0),
    (5, 9, 1),
    (5, 5, 0),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'rating'])
df.agg(F.countDistinct('user')).show()
# +--------------------+
# |count(DISTINCT user)|
# +--------------------+
# |                   5|
# +--------------------+

ref:
http://stackoverflow.com/questions/40888946/spark-dataframe-count-distinct-values-of-every-column

SELECT MAX(xxx) ... GROUP BY

df.groupBy('user').count().filter('count >= 4').show()

popular_df = raw_df \
    .groupBy('repo_id') \
    .agg(F.max('stargazers_count').alias('stars')) \
    .orderBy('stars', ascending=False) \
    .limit(top_n)

ref:
http://stackoverflow.com/questions/30616380/spark-how-to-count-number-of-records-by-key

SELECT COUNT() ... GROUP BY

prediction_df.groupBy('user').count().show()
# +------+-----+
# |  user|count|
# +------+-----+
# |649661|   15|
# |464340|   15|
# |468780|   15|
# |489233|   11|
# |455143|   14|
# +------+-----+

stargazers_count_df = rating_df \
    .groupBy('item') \
    .agg(F.count('user').alias('stargazers_count')) \
    .orderBy('stargazers_count', ascending=False)
# +--------+----------------+
# |    item|stargazers_count|
# +--------+----------------+
# | 3544424|             137|
# | 2126244|             120|
# | 7691631|             115|
# | 1362490|             112|
# |  943149|             112|
# +--------+----------------+

starred_count_df = rating_df \
    .groupBy('user') \
    .agg(F.count('item').alias('starred_count')) \
    .orderBy('starred_count', ascending=False)
# +-------+-------------+
# |   user|starred_count|
# +-------+-------------+
# |  48936|         7201|
# |4560482|         5898|
# |3382565|         3978|
# |  10652|         3586|
# |  31996|         3459|
# +-------+-------------+

You may want to use approx_count_distinct.

GROUP_CONCAT a column

from pyspark.sql.functions import expr

per_user_predictions_df = output_df \
  .orderBy(['user', 'prediction'], ascending=False) \
  .groupBy('user') \
  .agg(expr('collect_list(item) as items'))
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[36560369, 197450...|
# |   47217|[40693501, 643554...|
# +--------+--------------------+

ref:
http://stackoverflow.com/questions/31640729/spark-sql-replacement-for-mysql-group-concat-aggregate-function

GROUP_CONCAT multiple columns

from pyspark.sql.functions import col, collect_list, struct

matrix = [
    (1, 1, 0.1),
    (1, 2, 5.1),
    (1, 6, 0.0),
    (2, 6, 9.3),
    (3, 1, 0.54),
    (3, 5, 0.83),
    (4, 1, 0.65),
    (4, 4, 1.023),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'prediction'])

df \
    .groupBy("user") \
    .agg(collect_list(struct(col('item'), col('prediction'))).alias("recommendations")) \
    .show(truncate=False)
# +----+---------------------------+
# |user|recommendations            |
# +----+---------------------------+
# |1   |[[1,0.1], [2,5.1], [6,0.0]]|
# |3   |[[1,0.54], [5,0.83]]       |
# |2   |[[6,9.3]]                  |
# |4   |[[1,0.65], [4,1.023]]      |
# +----+---------------------------+

ref:
https://stackoverflow.com/questions/37737843/aggregating-multiple-columns-with-custom-function-in-spark

SELECT ... RANK() OVER (PARTITION BY ... ORDER BY)

from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
import pyspark.sql.functions as F

window_spec = Window.partitionBy('from_user_id').orderBy(col('starred_at').desc())
per_user_actual_items_df = raw_df \
    .select('from_user_id', 'repo_id', 'starred_at', F.rank().over(window_spec).alias('rank')) \
    .where('rank <= 10') \
    .groupBy('from_user_id') \
    .agg(expr('collect_list(repo_id) as items')) \
    .withColumnRenamed('from_user_id', 'user')
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[29122050, 634846...|
# |   59990|[9820191, 8729416...|
# +--------+--------------------+

window_spec = Window.partitionBy('user').orderBy(col('prediction').desc())
per_user_predicted_items_df = output_df \
    .select('user', 'item', 'prediction', F.rank().over(window_spec).alias('rank')) \
    .where('rank <= 10') \
    .groupBy('user') \
    .agg(expr('collect_list(item) as items'))
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[36560369, 197450...|
# |   47217|[40693501, 643554...|
# +--------+--------------------+

ref:
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
https://github.com/awesome-spark/spark-gotchas/blob/master/05_spark_sql_and_dataset_api.md#window-functions

Left anti join / Left excluding join

clean_df = rating_df.join(to_remove_items, 'item', 'left_anti')

ref:
https://www.codeproject.com/Articles/33052/Visual-Representation-of-SQL-Joins

Outer join

m1 = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 4, 1),
    (2, 2, 1),
    (2, 3, 1),
    (3, 5, 1),
]
m1df = spark.createDataFrame(m1, ['user', 'item', 'rating'])
m1df = m1df.where('user = 1').alias('m1df')

m2 = [
    (1, 100),
    (2, 200),
    (3, 300),
    (4, 400),
    (5, 500),
    (6, 600),
]
m2df = spark.createDataFrame(m2, ['item', 'count'])
m2df = m2df.alias('m2df')

m1df.join(m2df, m1df.item == m2df.item, 'rightouter') \
.where('m1df.user IS NULL') \
.orderBy('m2df.count', ascending=False) \
.selectExpr('1 AS user', 'm2df.item', '0 AS rating') \
.show()
# +----+----+------+
# |user|item|rating|
# +----+----+------+
# |   1|   6|     0|
# |   1|   5|     0|
# |   1|   3|     0|
# +----+----+------+

ref:
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-joins.html

Cross join

df = m1df.select('user').distinct().crossJoin(m2df.select('item'))
df.show()

query = """
SELECT f1.user, f2.item, 0 AS rating
FROM f1
CROSS JOIN f2
"""
df = spark.sql(query)
df.show()

all_user_item_pair_df = als_model.userFactors.selectExpr('id AS user') \
    .crossJoin(alsModel.itemFactors.selectExpr('id AS item'))
# +----+----+
# |user|item|
# +----+----+
# |xxxx|oooo|
# +----+----+

ref:
http://stackoverflow.com/questions/5464131/finding-pairs-that-do-not-exist-in-a-different-table

Spark RDD methods (Python / Scala)

Spark RDD methods (Python / Scala)

以下的 methods 有些是所有 RDD 都能用,有些則是 PairRDD 才有的功能。然後因為在不同的 projects 我先後用了 Spark 的 Python API 和 Scala API,所以以下的內容可能會混雜著兩者的範例。Scala API 要特別注意每個 method 接受和回傳的 type 的差異;Python API 就沒有這種限制了,畢竟是動態語言。

ref:
http://spark.apache.org/docs/latest/programming-guide.html#transformations
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions

RDD Methods

map()

函數 signature 為 def map[U](f: (T) ⇒ U): RDD[U]

map() 接受的 function 的輸入參數就是 RDD 的每個元素(從 DataFrame 的角度看,每個 row):func(row),return 一個任意物件(例如一個 int、一個 string 或一個 tuple)。所以 map() 之後會得到一個 row 數相同的 RDD,但是 type 可能會不一樣。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd.map(lambda row: (row[0], row[1])).collect()
# [(1, 1), (1, 2), (1, 3), (1, 6), (2, 6), (3, 1), (3, 5), (4, 1), (4, 4)]
import org.apache.spark.rdd.RDD

val rdd: RDD[(String, Int)] = sc.parallelize(Seq(
  ("Python", 1),
  ("Scala", 3),
  ("Java", 2)
))
val test: RDD[String] = rdd.map({
  case (lang, count) => {
    s"${lang}_${count}"
  }
})
test.collect().foreach(println)
// Python_1
// Scala_3
// Java_2

flatMap()

函數 signature 為 def flatMap[U](f: (T) ⇒ TraversableOnce[U]): RDD[U]

flatMap()map() 很像,接受的 function 的輸入參數也是 RDD 的每個 row:func(row),差別在於 flatMap() 只能回傳一個 Iterable 物件(例如一個 tuple 或 list,但是可以是空的),而且 flatMap() 會把 return 的結果攤平。所以 flatMap() 之後的 count() 可能會比原本的 RDD 大或小。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd.flatMap(lambda row: (row[0], row[1])).collect()
# [1, 1, 1, 2, 1, 3, 1, 6, 2, 6, 3, 1, 3, 5, 4, 1, 4, 4]

df.rdd.flatMap(lambda row: (row[2], )).collect()
# [1, 1, 1, 0, 1, 1, 1, 0, 1]

ref:
http://backtobazics.com/big-data/spark/apache-spark-flatmap-example/
http://apachesparkbook.blogspot.tw/2015/05/difference-between-map-flatmap-in-rdd.html
http://stackoverflow.com/questions/42867457/map-each-element-of-a-list-in-spark
http://stackoverflow.com/questions/21096432/list-or-iterator-of-tuples-returned-by-map-pyspark

reduce()

函數 signature 為 def reduce(f: (T, T) ⇒ T): T

reduce() 接受的 function 的輸入參數是 RDD 的兩兩元素:func(element1, element2),return 單一物件,而且是跟輸入參數同樣 type 的,最後整個 reduce() 會得到一個單一的值。

array = [
    1,
    5,
    4,
    2,
    3,
]
rdd = sc.parallelize(array)

rdd.reduce(lambda element1, element2: element1 + element2)
# 15

def max(element1, element2):
    return element1 if element1 > element2 else element2

rdd.reduce(max)
# 5

treeReduce()

函數 signature 為 def treeReduce(f: (T, T) ⇒ T, depth: Int = 2): T

普通的 reduce() 會直接把每一個 partition 的 reduce 結果送回 driver machine 做最後運算,當 partition 很多和每個 partition 的資料量很大的時候可能會是一個瓶頸。這時候你可以改用 treeReduce(),不過使用不當的話反而會有反效果。類似的關係還有 aggregate()treeAggregate()

ref:
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/treereduce_and_treeaggregate_demystified.html

aggregate()

函數 signature 為 def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U

import org.apache.spark.rdd.RDD

case class WikipediaArticle(title: String, text: String) {
  def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}

val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
  WikipediaArticle("a1", "abc Scala xyz"),
  WikipediaArticle("a2", "abc Python xyz"),
  WikipediaArticle("a3", "abc Scala xyz"),
  WikipediaArticle("a4", "abc Scala xyz")
))

def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int = {
  val accumulateOp = (total: Int, a: WikipediaArticle) => total + 1
  val combineOp = (count1: Int, count2: Int) => count1 + count2
  rdd.filter(_.mentionsLanguage(lang)).aggregate(0)(accumulateOp, combineOp)
}

occurrencesOfLang("Scala", wikiRdd)
// 3

ref:
https://stackoverflow.com/questions/28240706/explain-the-aggregate-functionality-in-spark

PairRDD Methods

PairRDD 就是包含 key/value pair 的 RDD,長這樣:RDD[(K, V)]KV 除了可能是基本的 type 之外,也可能是其他 object 或 collection。PairRDDFunctions 回傳的也都是 RDD[(K, V)]

ref:
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

mapValues()

函數 signature 為 def mapValues[U](f: (V) ⇒ U): RDD[(K, U)]

mapValues() 的 function 接受的唯一一個參數就是 PairRDDV,它只處理 values。

val rdd = sc.parallelize(Seq(
  ("Python", 1),
  ("Scala", 3),
  ("Java", 2)
))

rdd
  .mapValues((count: Int) => {
    count * 10
  })
  .collect()
// Array((Python,10), (Scala,30), (Java,20))

ref:
http://apachesparkbook.blogspot.tw/2015/12/mapvalues-example.html

groupByKey()

函數 signature 為 def groupByKey(): RDD[(K, Iterable[V])]

以下這些情況應該避免使用 groupByKey()

  • If operataion is expressed using groupByKey followed by associative and commutative reducing operation on values (sum, count, max / min) it should be replaced by reduceByKey.
  • If operation can be expressed using a comination of local sequence operation and merge operation (online variance / mean, top-n observations) it should be expressed with combineByKey or aggregateByKey.
  • If final goal is to traverse values in a specific order (groupByKey followed by sorting values followed by iteration) it can be typically rewritten as repartitionAndSortWithinPartitions with custom partitioner and ordering followed by mapPartitions.
case class WikipediaArticle(title: String, text: String) {
  def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}

val langs = List(
  "Java", "PHP", "Python", "Scala"
)

val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
  WikipediaArticle("a1", "a1 Scala"),
  WikipediaArticle("a2", "a2 Python"),
  WikipediaArticle("a3", "a3 Scala"),
  WikipediaArticle("a4", "a4 Java"),
  WikipediaArticle("a5", "a5 Java"),
  WikipediaArticle("a6", "a6 Scala")
))

def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = {
  rdd
    .flatMap((article: WikipediaArticle) => {
      langs
        .filter((lang: String) => {
          article.mentionsLanguage(lang)
        })
        .map((lang: String) => {
          (lang, article)
        })
    })
    .groupByKey()
}

def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd)
// Array(
//   (Java,CompactBuffer(WikipediaArticle(a4,a4 Java), WikipediaArticle(a5,a5 Java))),
//   (Python,CompactBuffer(WikipediaArticle(a2,a2 Python))),
//   (Scala,CompactBuffer(WikipediaArticle(a1,a1 Scala), WikipediaArticle(a3,a3 Scala), WikipediaArticle(a6,a6 Scala)))
// )

def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] = {
  index
    .map({
      case (lang, articles) => {
        (lang, articles.size)
      }
    })
    .sortBy(-_._2)
    .collect()
    .toList
}

rankLangsUsingIndex(index)
// List((Scala,3),(Java,2),(Python,1),(PHP,0))

Spark Best Practices
https://github.com/beeva/beeva-best-practices/blob/master/big_data/spark/README.md

RDD actions and Transformations by Example
https://github.com/awesome-spark/spark-gotchas/blob/master/04_rdd_actions_and_transformations_by_example.md

Avoid groupByKey when performing an associative reductive operation
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/avoid_groupbykey_when_performing_an_associative_re.html

reduceByKey()

函數 signature 為 def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

reduceByKey() 接受的兩個參數的類型是由 PairRDDV 決定的。做到同樣的功能,reduceByKey() 的執行效率比 groupByKey() + reduce() 好很多。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd \
    .map(lambda row: (row[0], row[1])) \
    .reduceByKey(lambda x, y: x + y) \
    .collect()
# map() => [(1, 1), (1, 2), (1, 3), (1, 6), (2, 6), (3, 1), (3, 5), (4, 1), (4, 4)]
# reduceByKey() => [(1, 12), (2, 6), (3, 6), (4, 5)]
matrix = [
    (1, [[100, 1.0], [200, 2.0]]),
    (2, [[300, 3.0]]),
    (2, [[400, 4.0], [600, 6.0]]),
    (2, [[500, 5.0]]),
]
df = spark.createDataFrame(matrix, ['user', 'recommendations'])

def merge_recommendations(recommendations1, recommendations2):
    return recommendations1 + recommendations2

def slice_recommendations(row, candidate_k):
    user, recommendations = row
    sliced_recommendations = sorted(recommendations, key=lambda recommendation: recommendation.rating, reverse=True)[:candidate_k + 5]
    return (user, sliced_recommendations)

full_rdd = df \
    .rdd \
    .reduceByKey(lambda x, y: merge_recommendations(x, y)) \
    .map(lambda row: slice_recommendations(row, candidate_k))
final_df = spark.createDataFrame(full_rdd, ['user', 'recommendations'])
val langs = List(
  "Java", "PHP", "Python", "Scala"
)

val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
  WikipediaArticle("a1", "a1 Scala"),
  WikipediaArticle("a2", "a2 Python"),
  WikipediaArticle("a3", "a3 Scala"),
  WikipediaArticle("a4", "a4 Java"),
  WikipediaArticle("a5", "a5 Java"),
  WikipediaArticle("a6", "a6 Scala")
))

def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = {
  rdd
    .flatMap((article: WikipediaArticle) => {
      langs
        .filter((lang: String) => {
          article.mentionsLanguage(lang)
        })
        .map((lang: String) => {
          (lang, 1)
        })
    })
    .reduceByKey((count1: Int, count2: Int) => {
      count1 + count2
    })
    .collect()
    .toList
    .sortWith(_._2 > _._2)
}

rankLangsReduceByKey(langs, wikiRdd)
// List((Scala,3),(Java,2),(Python,1),(PHP,0))

Avoid reduceByKey when the input and output value types are different
http://backtobazics.com/big-data/spark/apache-spark-reducebykey-example/

Reduce a key-value pair into a key-list pair
http://stackoverflow.com/questions/27002161/reduce-a-key-value-pair-into-a-key-list-pair-with-apache-spark

foldByKey()

函數 signature 為 def foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]

foldByKey() 基本上就是可以手動指定 zero value 的 reduceByKey()

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd \
    .map(lambda row: (row[0], [row[1], ])) \
    .foldByKey(list(), add) \
    .collect()
# [(4, [1, 4]), (1, [1, 2, 3, 6]), (2, [6]), (3, [1, 5])]

aggregateByKey()

函數 signature 為 def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U): RDD[(K, U)]

適合用來取代 .map(lambda row: (row['user'], [row['item'], ])).reduceByKey(lambda v1, v2: v1 + v2) 這樣的套路。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 4, 1),
    (1, 5, 1),
    (2, 2, 1),
    (2, 2, 1),
    (2, 3, 1),
    (3, 5, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

def seqFunc(item_set, item):
    item_set.add(item)
    return item_set

def combFunc(item_set1, item_set2):
    return item_set1.union(item_set2)

df.select('user', 'item').rdd \
    .aggregateByKey(set(), seqFunc, combFunc) \
    .collect()
# [(1, {1, 2, 4, 5}), (2, {2, 3}), (3, {5})]

ref:
http://codingjunkie.net/spark-agr-by-key/
https://stackoverflow.com/questions/31081563/apache-spark-what-is-the-equivalent-implementation-of-rdd-groupbykey-using-rd
https://stackoverflow.com/questions/24804619/how-does-spark-aggregate-function-aggregatebykey-work

combineByKey()

函數 signature 為 def combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

用來把 RDD[(K, V)] 轉變成 RDD[(K, C)]C 可以是任意的 type。

combineByKey() 接受三個 functions:

  • createCombiner, which turns a V into a C (e.g., creates a one-element list)
  • mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
  • mergeCombiners, to combine two C’s into a single one.
import pyspark.sql.functions as F

matrix = [
    ('chinese', 80),
    ('math', 60),
    ('english', 100),
    ('chinese', 90),
    ('math', 100),
    ('math', 10),
    ('english', 70),
    ('english', 20),
    ('history', 30),
]
df = spark.createDataFrame(matrix, ['subject', 'score'])

def createCombiner(score):
    return (score, 1)

def mergeValue(accumulate, score):
    total_score = accumulate[0] + score
    total_count = accumulate[1] + 1
    return (total_score, total_count)

def mergeCombiners(accumulate1, accumulate2):
    total_score = accumulate1[0] + accumulate2[0]
    total_count = accumulate1[1] + accumulate2[1]
    return (total_score, total_count)

df.rdd.combineByKey(createCombiner, mergeValue, mergeCombiners).collect()
# you could calculate the average score of every subject
# [('chinese', (170, 2)),
 # ('history', (30, 1)),
 # ('math', (170, 3)),
 # ('english', (190, 3))]

ref:
https://zhangyi.gitbooks.io/spark-in-action/content/chapter2/rdd.html

cogroup()

如果你要 join 兩個已經被 groupBy 的 RDD,可以使用 cogroup()。避免使用 flatMap + join + groupBy 這樣的套路。

Setup Spark on macOS

Setup Spark on macOS

Install

First, you need Java 8 JDK.
http://www.oracle.com/technetwork/java/javase/downloads/index.html

$ java -version
java version "1.8.0_131"

Homebrew Version

$ brew update
$ brew install maven apache-spark

Pre-built Version

$ mkdir -p /usr/local/share/apache-spark && \
  cd /usr/local/share/apache-spark && \
  wget https://www.apache.org/dyn/closer.lua/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz && \
  tar -xvzf spark-2.2.0-bin-hadoop2.7.tgz

ref:
http://spark.apache.org/downloads.html

Build Version

This is the recommended way.

$ brew install [email protected]
$ export PATH="/usr/local/opt/[email protected]/bin:$PATH"
$ scala -version
Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL

$ mkdir -p /usr/local/share/apache-spark && \
  cd /usr/local/share/apache-spark && \
  wget https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0.tgz && \
  tar -xvzf spark-2.2.0.tgz && \
  cd spark-2.2.0

$ ./build/mvn -Pnetlib-lgpl -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.3 -DskipTests -T 4C package
# or
$ ./build/mvn -Pnetlib-lgpl -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.3 -DskipTests clean package
$ spark-shell --packages "com.github.fommil.netlib:all:1.1.2"
scala> import com.github.fommil.netlib.BLAS
import com.github.fommil.netlib.BLAS
scala> BLAS.getInstance().getClass().getName()
res1: String = com.github.fommil.netlib.NativeSystemBLAS

ref:
http://spark.apache.org/downloads.html
http://spark.apache.org/docs/latest/building-spark.html
http://spark.apache.org/docs/latest/ml-guide.html#dependencies

Configurations

in .zshrc

if which java > /dev/null; then
  export JAVA_HOME="$(/usr/libexec/java_home -v 1.8)"
  export PATH="$JAVA_HOME/bin:$PATH"
fi

export PATH="/usr/local/opt/[email protected]/bin:$PATH"

# homebrew version
export SPARK_HOME="/usr/local/Cellar/apache-spark/2.2.0/libexec"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"
export PYSPARK_DRIVER_PYTHON="ipython"

# pre-built version
export SPARK_HOME="/usr/local/share/apache-spark/spark-2.2.0-bin-hadoop2.7"
export PATH="$SPARK_HOME/bin:$PATH"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"

# build version
export SPARK_HOME="/usr/local/share/apache-spark/spark-2.2.0"
export PATH="$SPARK_HOME/bin:$PATH"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"

ref:
https://spark.apache.org/docs/latest/programming-guide.html
https://spark.apache.org/docs/latest/configuration.html

$ cd $SPARK_HOME

$ cp conf/spark-defaults.conf.template conf/spark-defaults.conf
spark.driver.memory              4g
spark.executor.memory            4g
spark.jars.packages              com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41
spark.serializer                 org.apache.spark.serializer.KryoSerializer

$ cp conf/spark-env.sh.template conf/spark-env.sh
export PYTHONHASHSEED=42

$ cp conf/log4j.properties.template conf/log4j.properties

ref:
https://spark.apache.org/docs/latest/configuration.html

Commands

Local Mode

$ spark-shell

$ export PYSPARK_DRIVER_PYTHON="jupyter" && \
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip 0.0.0.0" && \
pyspark \
--packages "com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41" \
--driver-memory 4g \
--executor-memory 4g \
--master "local[*]"

$ spark-shell \
--packages "com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41"
--master "local-cluster[3, 1, 4096]"

# Spark Application UI on the driver
$ open http://localhost:4040/

ref:
https://spark.apache.org/docs/latest/programming-guide.html

Standalone mode

There are two deploy modes for Spark Standalone. In client mode, the driver is launched in the same process as the client that submits the application. In cluster mode, however, the driver is launched from one of the Worker.

$ ./sbin/start-master.sh -h localhost
$ ./sbin/start-slave.sh spark://localhost:7077

# Spark Web UI on the cluster manager
$ open http://localhost:8080/

$ pyspark \
--driver-memory 4g \
--executor-memory 4g \
--master spark://localhost:7077

$ spark-submit \
--master spark://localhost:7077 \
examples/src/main/python/pi.py 10

$ spark-submit \
--driver-memory 2g \
--driver-java-options "-XX:ThreadStackSize=81920" \
--total-executor-cores 3 \
--executor-cores 3 \
--executor-memory 12g \
--conf "spark.executor.extraJavaOptions=-XX:ThreadStackSize=81920" \
--master spark://localhost:7077 \
--packages "mysql:mysql-connector-java:5.1.41,com.hankcs:hanlp:portable-1.3.4,edu.stanford.nlp:stanford-corenlp:3.7.0" \
--jars "/Users/vinta/Projects/albedo/spark-data/stanford-corenlp-3.8.0-models.jar" \
--class ws.vinta.albedo.LogisticRegressionRanker \
target/albedo-1.0.0-SNAPSHOT.jar

# Spark Application UI on the driver
$ open http://localhost:4040/

ref:
https://spark.apache.org/docs/latest/spark-standalone.html
https://spark.apache.org/docs/latest/submitting-applications.html
https://spark.apache.org/docs/latest/configuration.html