Spark SQL cookbook (Python)

Spark SQL cookbook (Python)

Access Spark session

from pyspark.sql import SparkSession

# get the default SparkSession instance
spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext
sc.setLogLevel('INFO')

Create a RDD

array = [
    1,
    5,
    7,
    2,
    3,
]
rdd = sc.parallelize(array)
df = rdd.toDF('int')
# +-----+
# |value|
# +-----+
# |    1|
# |    5|
# |    7|
# |    2|
# |    3|
# +-----+

Read data from MySQL

$ pyspark --packages "mysql:mysql-connector-java:5.1.41"
url = 'jdbc:mysql://127.0.0.1:3306/albedo'
properties = {'user': 'root', 'password': '123'}
spark.read.jdbc(url, 'app_repostarring', properties=properties)

ref:
https://dev.mysql.com/downloads/connector/j/
http://www.gatorsmile.io/numpartitionsinjdbc/

Read data from SQLite

$ pyspark --packages "org.xerial:sqlite-jdbc:3.16.1"
from pyspark.sql.functions import lit
from pyspark.sql.types import IntegerType, StructField, StructType

props = {'driver': 'org.sqlite.JDBC', 'date_string_format': 'yyyy-MM-dd HH:mm:ss'}
df = spark.read.jdbc("jdbc:sqlite:db.sqlite3", "app_repostarring", properties=props)

df = df.where(df.stargazers_count >= min_stargazers_count)
df = df.select('from_user_id', 'repo_id')
df = df.toDF('user', 'item') 
df = df.withColumn('rating', lit(1))

schema = StructType([
    StructField('user', IntegerType(), nullable=False),
    StructField('item', IntegerType(), nullable=False),
    StructField('rating', IntegerType(), nullable=False),
])
df = spark.createDataFrame(df.rdd, schema)

ref:
https://github.com/xerial/sqlite-jdbc

java.text.ParseException: Unparseable date: "2016-04-22 17:26:54"
https://github.com/xerial/sqlite-jdbc/issues/88

Create a DataFrame

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (2, 1, 0),
    (3, 1, 1),
    (3, 3, 1),
    (3, 4, 1),
    (4, 1, 0),
    (4, 2, 0),
    (5, 9, 1),
    (5, 5, 0),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'rating'])

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession.createDataFrame

Create a DataFrame with explicit schema

from pyspark.sql.types import *

matrix = [
    ('Alice', 0.5, 5.0),
    ('Bob',   0.2, 92.0),
    ('Tom',   0.0, 122.0),
    ('Zack',  0.1, 1.0),
]
schema = StructType([
    StructField('name', StringType(), nullable=False),
    StructField('prediction', DoubleType(), nullable=False),
    StructField('rating', DoubleType(), nullable=False)
])
df = spark.createDataFrame(matrix, schema)
df.printSchema()

newDF = spark.createDataFrame(someRDD, df.schema)

ref:
https://github.com/awesome-spark/spark-gotchas/blob/master/05_spark_sql_and_dataset_api.md

Change schema of a DataFrame

from pyspark.sql.types import DoubleType, IntegerType, StructField, StructType

schema = StructType([
    StructField('user', IntegerType(), nullable=False),
    StructField('item', IntegerType(), nullable=False),
    StructField('rating', DoubleType(), nullable=False),
])
df = spark.createDataFrame(df.rdd, schema)

Get numbers of partitions

df.rdd.getNumPartitions()

Show a DataFrame

# print the schema in a tree format
df.printSchema()

# print top 20 rows
df.show()

# print top 100 rows
df.show(100)

# calculate the descriptive statistics
df.describe().show()
df.describe(['like_count', ]).show()

ref:
http://spark.apache.org/docs/latest/sql-programming-guide.html#untyped-dataset-operations-aka-dataframe-operations

Create a column with a literal value

from pyspark.sql.functions import lit

df = df.withColumn('like', lit(1))

ref:
http://stackoverflow.com/questions/33681487/how-do-i-add-a-new-column-to-a-spark-dataframe-using-pyspark

Return a fraction of a DataFrame

fraction = {
    'u1': 0.5,
    'u2': 0.5,
    'u3': 0.5,
    'u4': 0.5,
    'u5': 0.5,
}
df.sampleBy('user', fraction).show()
# +----+----+----------+
# |user|item|prediction|
# +----+----+----------+
# |  u1|  i1|         1|
# |  u3|  i4|         4|
# |  u4|  i1|         1|
# |  u4|  i2|         3|
# |  u5|  i5|         5|
# +----+----+----------+

Show distinct values of a column

df.select('user').distinct().show()

ref:
https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/

Rename columns

df.printSchema()
# root
# |-- from_user_id: integer (nullable = true)
 # |-- repo_id: integer (nullable = true)

df = df.toDF('user', 'item')

Convert a column to double type

df = df.withColumn('prediction', df['prediction'].cast('double'))

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.cast

Update a colume based on conditions

from pyspark.sql.functions import when

df = df.withColumn('label', when(df['prediction'] > 0.5, 1).otherwise(0))

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.when
http://stackoverflow.com/questions/34908448/spark-add-column-to-dataframe-conditionally

Drop columns from a DataFrame

predictions = predictions.dropna(subset=['prediction',])

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropna

DataFrame subtract another DataFrame

testingRDD = df.rdd.subtract(training.rdd)
testing = spark.createDataFrame(testingRDD, df.schema)

Convert a DataFrame column into a Python list

popluar_items = [row['item'] for row in popular_df.select('item').collect()]

Concatenate (merge) two DataFrames

full_df = df1.union(df2)

Convert a DataFrame to a Python dict

d = df.rdd.collectAsMap()
d['some_key']

Broadcast a value

bc_candidates = sc.broadcast(set([1, 2, 4, 5, 8]))
print(bc_candidates.value)
# {8, 1, 2, 4, 5}

bc_df = sc.broadcast(df.collect())
df = spark.createDataFrame(bc_df.value)

ref:
http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

Broadcast a DataFrame in join

import pyspark.sql.functions as F

large_df.join(F.broadcast(small_df), 'some_key')

ref:
http://stackoverflow.com/questions/34053302/pyspark-and-broadcast-join-example
https://chapeau.freevariable.com/2014/09/improving-spark-application-performance.html

Cache a DataFrame

df.cache()

ref:
http://stackoverflow.com/questions/38056774/spark-cache-vs-broadcast

Show query execution plan

df.explain(extended=True)

Use SQL with a DataFrame

props = {'driver': 'org.sqlite.JDBC', 'date_string_format': 'yyyy-MM-dd HH:mm:ss'}
df = spark.read.jdbc("jdbc:sqlite:db.sqlite3", "app_repostarring", properties=props)
df.createOrReplaceTempView('repo_stars')

query = 'SELECT DISTINCT repo_id AS item FROM repo_stars WHERE stargazers_count > 1000'
df2 = spark.sql(query)
df2.show()

query = """
SELECT
    from_user_id AS user,
    count(repo_id) AS count
FROM repo_stars
GROUP BY from_user_id
ORDER BY count DESC
"""
df = spark.sql(query)

params = {'top_n': top_n}
query = """
SELECT 
    repo_id AS item,
    MAX(stargazers_count) AS stars
FROM repo_stars
GROUP BY repo_id
ORDER BY stars DESC
LIMIT {top_n}
""".format(**params)
popular_df = spark.sql(query)

ref:
https://sparkour.urizone.net/recipes/using-sql-udf/

WHERE ... IN ...

from pyspark.sql.functions import col

itemIDs = [1, 2, 5, 8]
rawDF \
    .where(col('repo_id').isin(itemIDs)) \
    .select('repo_url') \
    .collectAsMap()

ORDER BY ... multiple columns

import pyspark.sql.functions as F

ratingDF = rawDF \
    .selectExpr('from_user_id AS user', 'repo_id AS item', '1 AS rating', 'starred_at') \
    .orderBy('user', F.col('starred_at').desc())

Aggregate

df.agg(min('user'), max('user'), min('item'), max('item')).show()

max_value = userStarCountDF.agg(F.max('stars')).collect()[0]["max('stars')"]

SELECT COUNT(DISTINCT xxx) ...

import pyspark.sql.functions as F

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (2, 1, 0),
    (3, 1, 1),
    (3, 3, 1),
    (3, 4, 1),
    (4, 1, 0),
    (4, 2, 0),
    (5, 9, 1),
    (5, 5, 0),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'rating'])
df.agg(F.countDistinct('user')).show()
# +--------------------+
# |count(DISTINCT user)|
# +--------------------+
# |                   5|
# +--------------------+

ref:
http://stackoverflow.com/questions/40888946/spark-dataframe-count-distinct-values-of-every-column

SELECT MAX(xxx) ... GROUP BY

df.groupBy('user').count().filter('count >= 4').show()

popular_df = raw_df \
    .groupBy('repo_id') \
    .agg(F.max('stargazers_count').alias('stars')) \
    .orderBy('stars', ascending=False) \
    .limit(top_n)

ref:
http://stackoverflow.com/questions/30616380/spark-how-to-count-number-of-records-by-key

SELECT COUNT() ... GROUP BY ...

stargazersCountDF = ratingDF \
    .groupBy('item') \
    .agg(F.count('user').alias('stargazers_count')) \
    .orderBy('stargazers_count', ascending=False)
stargazersCountDF.show()
# +--------+----------------+
# |    item|stargazers_count|
# +--------+----------------+
# | 3544424|             137|
# | 2126244|             120|
# | 7691631|             115|
# | 1362490|             112|
# |  943149|             112|
# +--------+----------------+

starredCountDF = ratingDF \
    .groupBy('user') \
    .agg(F.count('item').alias('starred_count')) \
    .orderBy('starred_count', ascending=False)
starredCountDF.show()
# +-------+-------------+
# |   user|starred_count|
# +-------+-------------+
# |  48936|         7201|
# |4560482|         5898|
# |3382565|         3978|
# |  10652|         3586|
# |  31996|         3459|
# +-------+-------------+

You may want to use approx_count_distinct.

GROUP_CONCAT a column

from pyspark.sql.functions import expr

perUserPredictionsDF = outputDF \
  .orderBy(['user', 'prediction'], ascending=False) \
  .groupBy('user') \
  .agg(expr('collect_list(item) as items'))
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[36560369, 197450...|
# |   47217|[40693501, 643554...|
# +--------+--------------------+

ref:
http://stackoverflow.com/questions/31640729/spark-sql-replacement-for-mysql-group-concat-aggregate-function

SELECT ... RANK() OVER (PARTITION BY ... ORDER BY ... ASC)

from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
import pyspark.sql.functions as F

windowSpec = Window.partitionBy('from_user_id').orderBy(col('starred_at').desc())
perUserActualItemsDF = rawDF \
    .select('from_user_id', 'repo_id', 'starred_at', F.rank().over(windowSpec).alias('rank')) \
    .where('rank <= 10') \
    .groupBy('from_user_id') \
    .agg(expr('collect_list(repo_id) as items')) \
    .withColumnRenamed('from_user_id', 'user')
perUserActualItemsDF.show()
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[29122050, 634846...|
# |   59990|[9820191, 8729416...|
# +--------+--------------------+

windowSpec = Window.partitionBy('user').orderBy(col('prediction').desc())
perUserPredictedItemsDF = outputDF \
    .select('user', 'item', 'prediction', F.rank().over(windowSpec).alias('rank')) \
    .where('rank <= 10') \
    .groupBy('user') \
    .agg(expr('collect_list(item) as items'))
perUserPredictedItemsDF.show()
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[36560369, 197450...|
# |   47217|[40693501, 643554...|
# +--------+--------------------+

ref:
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
https://github.com/awesome-spark/spark-gotchas/blob/master/05_spark_sql_and_dataset_api.md#window-functions

Left anti join / Left excluding join

cleanDF = ratingDF.join(toRemoveItem, 'item', 'left_anti')

ref:
https://www.codeproject.com/Articles/33052/Visual-Representation-of-SQL-Joins

Outer join

m1 = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 4, 1),
    (2, 2, 1),
    (2, 3, 1),
    (3, 5, 1),
]
m1df = spark.createDataFrame(m1, ['user', 'item', 'rating'])
m1df = m1df.where('user = 1').alias('m1df')

m2 = [
    (1, 100),
    (2, 200),
    (3, 300),
    (4, 400),
    (5, 500),
    (6, 600),
]
m2df = spark.createDataFrame(m2, ['item', 'count'])
m2df = m2df.alias('m2df')

m1df.join(m2df, m1df.item == m2df.item, 'rightouter') \
.where('m1df.user IS NULL') \
.orderBy('m2df.count', ascending=False) \
.selectExpr('1 AS user', 'm2df.item', '0 AS rating') \
.show()
# +----+----+------+
# |user|item|rating|
# +----+----+------+
# |   1|   6|     0|
# |   1|   5|     0|
# |   1|   3|     0|
# +----+----+------+

ref:
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-joins.html

Cross join

df = m1df.select('user').distinct().crossJoin(m2df.select('item'))
df.show()

query = """
SELECT f1.user, f2.item, 0 AS rating
FROM f1
CROSS JOIN f2
"""
df = spark.sql(query)
df.show()

ref:
http://stackoverflow.com/questions/5464131/finding-pairs-that-do-not-exist-in-a-different-table

Spark RDD methods (Python)

Spark RDD methods (Python)

RDDs have actions, which return values, and transformations, which return new RDDs.

ref:
http://spark.apache.org/docs/latest/programming-guide.html#transformations
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions

map()

map() 接受一個 function:func(row),這個 function 的輸入參數是 RDD 的每個元素(用 DataFrame 的角度,你也可以想成是每個 row),return 一個任意物件(例如一個 int、一個 string 或一個 tuple)。所以 map() 之後會得到一個 row 數相同的 RDD。

import pyspark.sql.functions as F

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd.map(lambda row: (row[0], row[1])).collect()
# [(1, 1), (1, 2), (1, 3), (1, 6), (2, 6), (3, 1), (3, 5), (4, 1), (4, 4)]

flatMap()

flatMap()map() 類似,接受一個 function:func(row),這個 function 的輸入參數是 RDD 的每個 row,差別在於 flatMap() 只能回傳一個 iterable 物件(例如一個 tuple 或 list,但是可以是空的),而且 flatMap() 會把 return 的結果攤平。所以 flatMap() 之後的長度可能會比原本的 RDD 大或小。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd.flatMap(lambda row: (row[0], row[1])).collect()
# [1, 1, 1, 2, 1, 3, 1, 6, 2, 6, 3, 1, 3, 5, 4, 1, 4, 4]

df.rdd.flatMap(lambda row: (row[2], )).collect()
# [1, 1, 1, 0, 1, 1, 1, 0, 1]

ref:
http://apachesparkbook.blogspot.tw/2015/05/difference-between-map-flatmap-in-rdd.html
http://stackoverflow.com/questions/42867457/map-each-element-of-a-list-in-spark
http://stackoverflow.com/questions/21096432/list-or-iterator-of-tuples-returned-by-map-pyspark

reduce()

reduce() 接受一個 function:func(element1, element2),這個 function 的輸入參數是 RDD 的兩兩元素,return 單一物件,最後整個 reduce() 會得到一個值。

array = [
    1,
    5,
    4,
    2,
    3,
]
rdd = sc.parallelize(array)

rdd.reduce(lambda element1, element2: element1 + element2)
# 15

def max(element1, element2):
    return element1 if element1 > element2 else element2

rdd.reduce(max)
# 5

treeReduce()

普通的 reduce() 會直接把每一個 partition 的 reduce 結果送回 driver machine 做最後運算,當 partition 很多和每個 partition 的資料量很大的時候可能會是一個瓶頸。這時候你可以改用 treeReduce(),不過使用不當的話反而會有反效果。

類似的關係還有 aggregate()treeAggregate()

ref:
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/treereduce_and_treeaggregate_demystified.html

groupByKey()

以下的 key 指的就是 row[0]

很多時候會建議避免使用 groupByKey(),改用 reduceByKey(), aggregateByKey(), foldByKey(), combineByKey()

When to avoid groupByKey():

  • If operataion is expressed using groupByKey followed by associative and commutative reducing operation on values (sum, count, max / min) it should be replaced by reduceByKey.
  • If operation can be expressed using a comination of local sequence operation and merge operation (online variance / mean, top-n observations) it should be expressed with combineByKey or aggregateByKey.
  • If final goal is to traverse values in a specific order (groupByKey followed by sorting values followed by iteration) it can be typically rewritten as repartitionAndSortWithinPartitions with custom partitioner and ordering followed by mapPartitions.

Spark Best Practices
https://github.com/beeva/beeva-best-practices/blob/master/big_data/spark/README.md

RDD actions and Transformations by Example
https://github.com/awesome-spark/spark-gotchas/blob/master/04_rdd_actions_and_transformations_by_example.md

Avoid groupByKey when performing an associative reductive operation
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/avoid_groupbykey_when_performing_an_associative_re.html

reduceByKey()

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd \
    .map(lambda row: (row[0], row[1])) \
    .reduceByKey(lambda x, y: x + y) \
    .collect()
# map() => [(1, 1), (1, 2), (1, 3), (1, 6), (2, 6), (3, 1), (3, 5), (4, 1), (4, 4)]
# reduceByKey() => [(1, 12), (2, 6), (3, 6), (4, 5)]

Avoid reduceByKey when the input and output value types are different
http://backtobazics.com/big-data/spark/apache-spark-reducebykey-example/

Reduce a key-value pair into a key-list pair
http://stackoverflow.com/questions/27002161/reduce-a-key-value-pair-into-a-key-list-pair-with-apache-spark

foldByKey()

foldByKey() 基本上就是可以手動指定 zero value 的 reduceByKey()

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 3, 1),
    (1, 6, 0),
    (2, 6, 1),
    (3, 1, 1),
    (3, 5, 1),
    (4, 1, 0),
    (4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

df.rdd \
    .map(lambda row: (row[0], [row[1], ])) \
    .foldByKey(list(), add) \
    .collect()
# [(4, [1, 4]), (1, [1, 2, 3, 6]), (2, [6]), (3, [1, 5])]

aggregateByKey()

可以用來取代 .map(lambda row: (row['user'], [row['item'], ])).reduceByKey(lambda v1, v2: v1 + v2) 這樣的套路。

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 4, 1),
    (1, 5, 1),
    (2, 2, 1),
    (2, 2, 1),
    (2, 3, 1),
    (3, 5, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])

def seqFunc(item_set, item):
    item_set.add(item)
    return item_set

def combFunc(item_set1, item_set2):
    return item_set1.union(item_set2)

df.select('user', 'item').rdd \
    .aggregateByKey(set(), seqFunc, combFunc) \
    .collect()
# [(1, {1, 2, 4, 5}), (2, {2, 3}), (3, {5})]

ref:
http://codingjunkie.net/spark-agr-by-key/
http://stackoverflow.com/questions/31081563/apache-spark-what-is-the-equivalent-implementation-of-rdd-groupbykey-using-rd

combineByKey()

用來把 RDD[(K, V)] 轉變成 RDD[(K, C)]C 可以是任意的 type。combineByKey() 基本上就是可以手動指定 zero value 的 aggregateByKey()

combineByKey() 接受三個 functions:

  • createCombiner, which turns a V into a C (e.g., creates a one-element list)
  • mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
  • mergeCombiners, to combine two C’s into a single one.
import pyspark.sql.functions as F

matrix = [
    ('chinese', 80),
    ('math', 60),
    ('english', 100),
    ('chinese', 90),
    ('math', 100),
    ('math', 10),
    ('english', 70),
    ('english', 20),
    ('history', 30),
]
df = spark.createDataFrame(matrix, ['subject', 'score'])

def createCombiner(score):
    return (score, 1)

def mergeValue(accumulate, score):
    total_score = accumulate[0] + score
    total_count = accumulate[1] + 1
    return (total_score, total_count)

def mergeCombiners(accumulate1, accumulate2):
    total_score = accumulate1[0] + accumulate2[0]
    total_count = accumulate1[1] + accumulate2[1]
    return (total_score, total_count)

df.rdd.combineByKey(createCombiner, mergeValue, mergeCombiners).collect()
# you could calculate the average score of every subject
# [('chinese', (170, 2)),
 # ('history', (30, 1)),
 # ('math', (170, 3)),
 # ('english', (190, 3))]

ref:
https://zhangyi.gitbooks.io/spark-in-action/content/chapter2/rdd.html

cogroup()

如果你要 join 兩個已經被 groupBy 的 RDD,可以使用 cogroup()。避免使用 flatMap + join + groupBy 這樣的套路。

collect()

如果你的 RDD 很大,不要使用 collect()count(),因為這些 functions 會把所有資料都放進 driver machine,很容易塞不下。可以使用 take()takeSample() 替代。

ref:
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/dont_collect_large_rdds.html

Setup Spark on macOS

Setup Spark on macOS

Install

First, you need Java 8 JDK.
http://www.oracle.com/technetwork/java/javase/downloads/index.html

$ java -version
java version "1.8.0_131"

$ brew update
$ brew install hadoop apache-spark

ref:
https://gist.github.com/ololobus/4c221a0891775eaa86b0
https://github.com/zipfian/spark-install

Build

You could also build Spark on your computer.

$ brew install [email protected] sbt 
$ export PATH="/usr/local/opt/[email protected]/bin:$PATH"
$ scala -version
Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL

$ mkdir -p /usr/local/share/spark && \
  cd /usr/local/share/spark && \
  wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0.tgz && \
  tar -xvzf spark-2.1.0.tgz && \
  cd spark-2.1.0
$ ./build/mvn -Pyarn -Phadoop-2.7 -Pnetlib-lgpl -Dhadoop.version=2.7.0 -DskipTests clean package
$ spark-shell --packages "com.github.fommil.netlib:all:1.1.2"
scala> import com.github.fommil.netlib.BLAS
import com.github.fommil.netlib.BLAS
scala> println(BLAS.getInstance().getClass().getName())
com.github.fommil.netlib.NativeSystemBLAS

ref:
http://spark.apache.org/docs/latest/building-spark.html
http://spark.apache.org/docs/latest/ml-guide.html#dependencies

Docker

$ docker run --rm -it -p 8888:8888 -v $PWD:/home/jovyan/work jupyter/all-spark-notebook

ref:
https://github.com/jupyter/docker-stacks/tree/master/all-spark-notebook

Configuration

in .zshrc

if which java > /dev/null; then
  export JAVA_HOME="$(/usr/libexec/java_home -v 1.8)"
  export PATH="$JAVA_HOME/bin:$PATH"
fi

export PATH="/usr/local/opt/[email protected]/bin:$PATH"

# brew version
export SPARK_HOME="/usr/local/Cellar/apache-spark/2.1.0/libexec"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"
export PYSPARK_DRIVER_PYTHON="ipython"

# build version
export PATH="/usr/local/share/spark/spark-2.1.0/bin:$PATH"
export SPARK_HOME="/usr/local/share/spark/spark-2.1.0"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"
export PYSPARK_DRIVER_PYTHON="ipython"

ref:
https://spark.apache.org/docs/latest/programming-guide.html
https://spark.apache.org/docs/latest/configuration.html

$ cd $SPARK_HOME

$ cp conf/spark-defaults.conf.template conf/spark-defaults.conf
# spark.jars.packages mysql:mysql-connector-java:5.1.41
# spark.driver.memory 4g
# spark.executor.memory 4g

$ cp conf/spark-env.sh.template conf/spark-env.sh
# export PYTHONHASHSEED=42

ref:
https://spark.apache.org/docs/latest/configuration.html

Commands

Local mode (Interactive shell)

$ spark-shell \
--packages "mysql:mysql-connector-java:5.1.41"

$ export PYSPARK_DRIVER_PYTHON="jupyter" && \
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip 0.0.0.0" && \
pyspark \
--packages "com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41" \
--driver-memory 4g \
--executor-memory 4g \
--master "local[*]"

# Spark Web UI of the driver
$ open http://localhost:4040/

ref:
https://spark.apache.org/docs/latest/programming-guide.html

Standalone mode

$ cd $SPARK_HOME
$ cp conf/spark-env.sh.template conf/spark-env.sh
$ echo "export PYTHONHASHSEED=42" >> conf/spark-env.sh

$ ./sbin/start-master.sh -h localhost
$ ./sbin/start-slave.sh spark://localhost:7077

# Spark Web UI on the cluster manager
$ open http://localhost:8080/

$ pyspark \
--driver-memory 4g \
--executor-memory 4g \
--master spark://localhost:7077
# or
$ spark-submit \
--master spark://localhost:7077 \
examples/src/main/python/pi.py 10

# Spark Application UI on the driver
$ open http://localhost:4040/

ref:
https://spark.apache.org/docs/latest/spark-standalone.html
https://spark.apache.org/docs/latest/submitting-applications.html