Spark SQL cookbook (Python)

Access SparkSession

from pyspark.sql import SparkSession

# get the default SparkSession instance
spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext
sc.setLogLevel('INFO')

Access custom configurations

$ spark-submit \
--master spark://localhost:7077 \
--properties-file my-properties.conf \
your_spark_app.py

# in my-properties.conf
# spark.myapp.db_host 192.168.100.10
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
print(sc.getConf().get('spark.myapp.db_host'))

ref:
https://stackoverflow.com/questions/31115881/how-to-load-java-properties-file-and-use-in-spark

Create a RDD

array = [
    1,
    5,
    7,
    2,
    3,
]
rdd = sc.parallelize(array)
df = rdd.toDF('int')
# +-----+
# |value|
# +-----+
# |    1|
# |    5|
# |    7|
# |    2|
# |    3|
# +-----+

Read data from MySQL

$ pyspark --packages "mysql:mysql-connector-java:5.1.41"
url = 'jdbc:mysql://127.0.0.1:3306/albedo'
properties = {'user': 'root', 'password': '123'}
spark.read.jdbc(url, 'app_repostarring', properties=properties)

ref:
https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-configuration-properties.html

Write data to MySQL

CREATE TABLE `recommender_songrecommend` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` int(11) NOT NULL,
  `song_id` int(11) NOT NULL,
  `score` double NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1;
matrix = [
    (1, 2, 0.1),
    (3, 4, 0.23456),
    (5, 6, 7.89),
    (7, 8, -10.111213),
]
df = spark.createDataFrame(matrix, ['user_id','song_id', 'score'])

url = 'jdbc:mysql://192.168.11.200:3306/DATABASE?user=USER&password=PASSWORD&verifyServerCertificate=false&useSSL=false&rewriteBatchedStatements=true'
properties = {'driver': 'com.mysql.jdbc.Driver'}
df \
    .selectExpr('user AS user_id', 'item AS song_id', 'score') \
    .write.jdbc(url, table='recommender_songrecommend', mode='append', properties=properties)

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.jdbc
https://stackoverflow.com/questions/2993251/jdbc-batch-insert-performance/10617768#10617768

Read data from SQLite

$ pyspark --packages "org.xerial:sqlite-jdbc:3.16.1"
from pyspark.sql.functions import lit
from pyspark.sql.types import IntegerType, StructField, StructType, TimestampType

props = {'driver': 'org.sqlite.JDBC', 'date_string_format': 'yyyy-MM-dd HH:mm:ss'}
df = spark.read.jdbc("jdbc:sqlite:db.sqlite3", "app_repostarring", properties=props)

df = df.where(df.stargazers_count >= min_stargazers_count)
df = df.select('from_user_id', 'repo_id', 'created_at')
df = df.toDF('user', 'item')
df = df.withColumn('rating', lit(1))

schema = StructType([
    StructField('user', IntegerType(), nullable=False),
    StructField('item', IntegerType(), nullable=False),
    StructField('rating', IntegerType(), nullable=False),
    StructField('item_created_at', TimestampType(), nullable=False),
])
df = spark.createDataFrame(df.rdd, schema)

ref:
https://github.com/xerial/sqlite-jdbc

java.text.ParseException: Unparseable date: "2016-04-22 17:26:54"
https://github.com/xerial/sqlite-jdbc/issues/88

Read data from parquet

from pyspark.sql.utils import AnalysisException

raw_df_filepath = 'raw_df.parquet'

try:
    raw_df = spark.read.format('parquet').load(raw_df_filepath)
except AnalysisException as exc:
    if 'Path does not exist' in exc.desc:
        raw_df = load_raw_data()
        raw_df.write.format('parquet').save(raw_df_filepath)
    else:
        raise exc

ref:
https://community.hortonworks.com/articles/21303/write-read-parquet-file-in-spark.html

Create a DataFrame

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (2, 1, 0),
    (3, 1, 1),
    (3, 3, 1),
    (3, 4, 1),
    (4, 1, 0),
    (4, 2, 0),
    (5, 9, 1),
    (5, 5, 0),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'rating'])

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession.createDataFrame

Create a DataFrame with explicit schema

from pyspark.sql.types import *

matrix = [
    ('Alice', 0.5, 5.0),
    ('Bob',   0.2, 92.0),
    ('Tom',   0.0, 122.0),
    ('Zack',  0.1, 1.0),
]
schema = StructType([
    StructField('name', StringType(), nullable=False),
    StructField('prediction', DoubleType(), nullable=False),
    StructField('rating', DoubleType(), nullable=False)
])
df = spark.createDataFrame(matrix, schema)
df.printSchema()

new_df = spark.createDataFrame(someRDD, df.schema)

ref:
https://github.com/awesome-spark/spark-gotchas/blob/master/05_spark_sql_and_dataset_api.md

Create a nested schema

from pyspark.sql.types import ArrayType
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType

recommendation_schema = StructType([
    StructField('item', IntegerType(), nullable=False),
    StructField('rating', FloatType(), nullable=False),
])

user_recommendations_schema = StructType([
    StructField('user', IntegerType(), nullable=False),
    StructField('recommendations', ArrayType(recommendation_schema), nullable=False),
])

matrix = [
    (1, [[100, 1.0], [200, 2.0]]),
    (2, [[300, 3.0]]),
]
df = spark.createDataFrame(matrix, user_recommendations_schema)

df.printSchema()
# root
# |-- user: integer (nullable = false)
# |-- recommendations: array (nullable = false)
# |    |-- element: struct (containsNull = true)
# |    |    |-- item: integer (nullable = false)
# |    |    |-- rating: float (nullable = false)
# |-- recommendations_count: integer (nullable = false)

df.show()
# +----+--------------------+---------------------+
# |user|     recommendations|recommendations_count|
# +----+--------------------+---------------------+
# |   1|[[100,1.0], [200,...|                    2|
# |   2|         [[300,3.0]]|                    1|
# +----+--------------------+---------------------+

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types

Change schema of a DataFrame

from pyspark.sql.types import DoubleType, IntegerType, StructField, StructType

schema = StructType([
    StructField('user', IntegerType(), nullable=False),
    StructField('item', IntegerType(), nullable=False),
    StructField('rating', DoubleType(), nullable=False),
])
df = spark.createDataFrame(df.rdd, schema)

Get numbers of partitions

df.rdd.getNumPartitions()

Split a DataFrame into chunks (partitions)

def write_to_db(partial_rows):
    values_tuples = [(row.user, row.item, row.score, year, week) for row in chunk_rows]
    result = your_mysql_insert_func(values_tuples)
    return [result, ]

save_result = df \
    .repartition(400) \
    .rdd \
    .mapPartitions(write_to_db) \
    .collect()

ref:
https://stackoverflow.com/questions/24898700/batching-within-an-apache-spark-rdd-map
https://stackoverflow.com/questions/35370826/using-spark-for-sequential-row-by-row-processing-without-map-and-reduce

Show a DataFrame

# print the schema in a tree format
df.printSchema()

# print top 20 rows
df.show()

# print top 100 rows
df.show(100)

# calculate the descriptive statistics
df.describe().show()
df.describe(['like_count', ]).show()

ref:
http://spark.apache.org/docs/latest/sql-programming-guide.html#untyped-dataset-operations-aka-dataframe-operations

Create a column with a literal value

from pyspark.sql.functions import lit

df = df.withColumn('like', lit(1))

ref:
http://stackoverflow.com/questions/33681487/how-do-i-add-a-new-column-to-a-spark-dataframe-using-pyspark

Return a fraction of a DataFrame

fraction = {
    'u1': 0.5,
    'u2': 0.5,
    'u3': 0.5,
    'u4': 0.5,
    'u5': 0.5,
}
df.sampleBy('user', fraction).show()
# +----+----+----------+
# |user|item|prediction|
# +----+----+----------+
# |  u1|  i1|         1|
# |  u3|  i4|         4|
# |  u4|  i1|         1|
# |  u4|  i2|         3|
# |  u5|  i5|         5|
# +----+----+----------+

Show distinct values of a column

df.select('user').distinct().show()

ref:
https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/

Rename columns

df.printSchema()
# root
# |-- from_user_id: integer (nullable = true)
# |-- repo_id: integer (nullable = true)

df = df.toDF('user', 'item')

Convert a column to double type

df = df.withColumn('prediction', df['prediction'].cast('double'))

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.cast

Update a colume based on conditions

from pyspark.sql.functions import when

df = df.withColumn('label', when(df['prediction'] > 0.5, 1).otherwise(0))

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.when
http://stackoverflow.com/questions/34908448/spark-add-column-to-dataframe-conditionally

Drop columns from a DataFrame

predictions = predictions.dropna(subset=['prediction',])

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropna

DataFrame subtract another DataFrame

matrix1 = [
    (1, 2, 123),
    (3, 4, 1),
    (5, 6, 45),
    (7, 8, 3424),
]
df1 = spark.createDataFrame(matrix1, ['user','item', 'play_count'])

matrix2 = [
    (3, 4),
    (5, 6),
    (9, 1),
    (7, 8),
    (1, 6),
    (1, 1),
]
df2 = spark.createDataFrame(matrix2, ['user','item'])

df2.subtract(df1.select('user', 'item')).show()
# +----+----+
# |user|item|
# +----+----+
# |   1|   1|
# |   1|   6|
# |   9|   1|
# +----+----+

# or

testing_rdd = df.rdd.subtract(training.rdd)
testing = spark.createDataFrame(testing_rdd, df.schema)

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.subtract

Convert a DataFrame column into a Python list

# list
popluar_items = [row['item'] for row in popular_df.select('item').collect()]

# set
all_items = {row.id for row in als_model.itemFactors.select('id').collect()}

Concatenate (merge) two DataFrames

full_df = df1.union(df2)

Convert a DataFrame to a Python dict

d = df.rdd.collectAsMap()
d['some_key']

Compute (approximate or exact) median of a numerical column

approximate_median = df.approxQuantile('count', [0.5, ], 0.25)
exact_median = df.approxQuantile('count', [0.5, ], 0.0)

maximum = df.approxQuantile('count', [1.0, ], 0.1)
minimum = df.approxQuantile('count', [0.0, ], 0.1)

ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.approxQuantile

Find frequent items for columns

df = rating_df.freqItems(['item', ], support=0.01)
# +--------------------+
# |      item_freqItems|
# +--------------------+
# |[194512, 371798, ...|
# +--------------------+

# get the value of a DataFrame column
popular_items = df.collect()[0].item_freqItems

ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.freqItems

Broadcast a value

bc_candidates = sc.broadcast(set([1, 2, 4, 5, 8]))
print(bc_candidates.value)
# {8, 1, 2, 4, 5}

bc_df = sc.broadcast(df.collect())
df = spark.createDataFrame(bc_df.value)

ref:
http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

Broadcast a DataFrame in join

import pyspark.sql.functions as F

large_df.join(F.broadcast(small_df), 'some_key')

ref:
http://stackoverflow.com/questions/34053302/pyspark-and-broadcast-join-example
https://chapeau.freevariable.com/2014/09/improving-spark-application-performance.html

Cache a DataFrame

df.cache()

ref:
http://stackoverflow.com/questions/38056774/spark-cache-vs-broadcast

Show query execution plan

df.explain(extended=True)

Use SQL with a DataFrame

props = {'driver': 'org.sqlite.JDBC', 'date_string_format': 'yyyy-MM-dd HH:mm:ss'}
df = spark.read.jdbc("jdbc:sqlite:db.sqlite3", "app_repostarring", properties=props)
df.createOrReplaceTempView('repo_stars')

query = 'SELECT DISTINCT repo_id AS item FROM repo_stars WHERE stargazers_count > 1000'
df2 = spark.sql(query)
df2.show()

query = """
SELECT
    from_user_id AS user,
    count(repo_id) AS count
FROM repo_stars
GROUP BY from_user_id
ORDER BY count DESC
"""
df = spark.sql(query)

params = {'top_n': top_n}
query = """
SELECT
    repo_id AS item,
    MAX(stargazers_count) AS stars
FROM repo_stars
GROUP BY repo_id
ORDER BY stars DESC
LIMIT {top_n}
""".format(**params)
popular_df = spark.sql(query)

ref:
https://sparkour.urizone.net/recipes/using-sql-udf/

WHERE ... IN ...

from pyspark.sql.functions import col

item_ids = [1, 2, 5, 8]
raw_df \
    .where(col('repo_id').isin(item_ids)) \
    .select('repo_url') \
    .collectAsMap()

ORDER BY ... multiple columns

import pyspark.sql.functions as F

rating_df = raw_df \
    .selectExpr('from_user_id AS user', 'repo_id AS item', '1 AS rating', 'starred_at') \
    .orderBy('user', F.col('starred_at').desc())

Aggregate

df.agg(min('user'), max('user'), min('item'), max('item')).show()

max_value = user_star_count_df.agg(F.max('stars')).collect()[0]["max('stars')"]

SELECT COUNT(DISTINCT xxx) ...

import pyspark.sql.functions as F

matrix = [
    (1, 1, 1),
    (1, 2, 1),
    (2, 1, 0),
    (3, 1, 1),
    (3, 3, 1),
    (3, 4, 1),
    (4, 1, 0),
    (4, 2, 0),
    (5, 9, 1),
    (5, 5, 0),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'rating'])
df.agg(F.countDistinct('user')).show()
# +--------------------+
# |count(DISTINCT user)|
# +--------------------+
# |                   5|
# +--------------------+

ref:
http://stackoverflow.com/questions/40888946/spark-dataframe-count-distinct-values-of-every-column

SELECT MAX(xxx) ... GROUP BY

df.groupBy('user').count().filter('count >= 4').show()

popular_df = raw_df \
    .groupBy('repo_id') \
    .agg(F.max('stargazers_count').alias('stars')) \
    .orderBy('stars', ascending=False) \
    .limit(top_n)

ref:
http://stackoverflow.com/questions/30616380/spark-how-to-count-number-of-records-by-key

SELECT COUNT() ... GROUP BY ...

prediction_df.groupBy('user').count().show()
# +------+-----+
# |  user|count|
# +------+-----+
# |649661|   15|
# |464340|   15|
# |468780|   15|
# |489233|   11|
# |455143|   14|
# +------+-----+

stargazers_count_df = rating_df \
    .groupBy('item') \
    .agg(F.count('user').alias('stargazers_count')) \
    .orderBy('stargazers_count', ascending=False)
# +--------+----------------+
# |    item|stargazers_count|
# +--------+----------------+
# | 3544424|             137|
# | 2126244|             120|
# | 7691631|             115|
# | 1362490|             112|
# |  943149|             112|
# +--------+----------------+

starred_count_df = rating_df \
    .groupBy('user') \
    .agg(F.count('item').alias('starred_count')) \
    .orderBy('starred_count', ascending=False)
# +-------+-------------+
# |   user|starred_count|
# +-------+-------------+
# |  48936|         7201|
# |4560482|         5898|
# |3382565|         3978|
# |  10652|         3586|
# |  31996|         3459|
# +-------+-------------+

You may want to use approx_count_distinct.

GROUP_CONCAT a column

from pyspark.sql.functions import expr

per_user_predictions_df = output_df \
  .orderBy(['user', 'prediction'], ascending=False) \
  .groupBy('user') \
  .agg(expr('collect_list(item) as items'))
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[36560369, 197450...|
# |   47217|[40693501, 643554...|
# +--------+--------------------+

ref:
http://stackoverflow.com/questions/31640729/spark-sql-replacement-for-mysql-group-concat-aggregate-function

GROUP_CONCAT multiple columns

from pyspark.sql.functions import col, collect_list, struct

matrix = [
    (1, 1, 0.1),
    (1, 2, 5.1),
    (1, 6, 0.0),
    (2, 6, 9.3),
    (3, 1, 0.54),
    (3, 5, 0.83),
    (4, 1, 0.65),
    (4, 4, 1.023),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'prediction'])

df \
    .groupBy("user") \
    .agg(collect_list(struct(col('item'), col('prediction'))).alias("recommendations")) \
    .show(truncate=False)
# +----+---------------------------+
# |user|recommendations            |
# +----+---------------------------+
# |1   |[[1,0.1], [2,5.1], [6,0.0]]|
# |3   |[[1,0.54], [5,0.83]]       |
# |2   |[[6,9.3]]                  |
# |4   |[[1,0.65], [4,1.023]]      |
# +----+---------------------------+

ref:
https://stackoverflow.com/questions/37737843/aggregating-multiple-columns-with-custom-function-in-spark

SELECT ... RANK() OVER (PARTITION BY ... ORDER BY ... ASC)

from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
import pyspark.sql.functions as F

window_spec = Window.partitionBy('from_user_id').orderBy(col('starred_at').desc())
per_user_actual_items_df = raw_df \
    .select('from_user_id', 'repo_id', 'starred_at', F.rank().over(window_spec).alias('rank')) \
    .where('rank <= 10') \
    .groupBy('from_user_id') \
    .agg(expr('collect_list(repo_id) as items')) \
    .withColumnRenamed('from_user_id', 'user')
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[29122050, 634846...|
# |   59990|[9820191, 8729416...|
# +--------+--------------------+

window_spec = Window.partitionBy('user').orderBy(col('prediction').desc())
per_user_predicted_items_df = output_df \
    .select('user', 'item', 'prediction', F.rank().over(window_spec).alias('rank')) \
    .where('rank <= 10') \
    .groupBy('user') \
    .agg(expr('collect_list(item) as items'))
# +--------+--------------------+
# |    user|               items|
# +--------+--------------------+
# |    2142|[36560369, 197450...|
# |   47217|[40693501, 643554...|
# +--------+--------------------+

ref:
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
https://github.com/awesome-spark/spark-gotchas/blob/master/05_spark_sql_and_dataset_api.md#window-functions

Left anti join / Left excluding join

clean_df = rating_df.join(to_remove_items, 'item', 'left_anti')

ref:
https://www.codeproject.com/Articles/33052/Visual-Representation-of-SQL-Joins

Outer join

m1 = [
    (1, 1, 1),
    (1, 2, 1),
    (1, 4, 1),
    (2, 2, 1),
    (2, 3, 1),
    (3, 5, 1),
]
m1df = spark.createDataFrame(m1, ['user', 'item', 'rating'])
m1df = m1df.where('user = 1').alias('m1df')

m2 = [
    (1, 100),
    (2, 200),
    (3, 300),
    (4, 400),
    (5, 500),
    (6, 600),
]
m2df = spark.createDataFrame(m2, ['item', 'count'])
m2df = m2df.alias('m2df')

m1df.join(m2df, m1df.item == m2df.item, 'rightouter') \
.where('m1df.user IS NULL') \
.orderBy('m2df.count', ascending=False) \
.selectExpr('1 AS user', 'm2df.item', '0 AS rating') \
.show()
# +----+----+------+
# |user|item|rating|
# +----+----+------+
# |   1|   6|     0|
# |   1|   5|     0|
# |   1|   3|     0|
# +----+----+------+

ref:
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-joins.html

Cross join

df = m1df.select('user').distinct().crossJoin(m2df.select('item'))
df.show()

query = """
SELECT f1.user, f2.item, 0 AS rating
FROM f1
CROSS JOIN f2
"""
df = spark.sql(query)
df.show()

all_user_item_pair_df = als_model.userFactors.selectExpr('id AS user') \
    .crossJoin(alsModel.itemFactors.selectExpr('id AS item'))
# +----+----+
# |user|item|
# +----+----+
# |xxxx|oooo|
# +----+----+

ref:
http://stackoverflow.com/questions/5464131/finding-pairs-that-do-not-exist-in-a-different-table