Access SparkSession
from pyspark.sql import SparkSession
# get the default SparkSession instance
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('INFO')
Access custom configurations
$ spark-submit \
--master spark://localhost:7077 \
--properties-file my-properties.conf \
your_spark_app.py
# in my-properties.conf
# spark.myapp.db_host 192.168.100.10
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
print(sc.getConf().get('spark.myapp.db_host'))
ref:
https://stackoverflow.com/questions/31115881/how-to-load-java-properties-file-and-use-in-spark
Create a RDD
array = [
1,
5,
7,
2,
3,
]
rdd = sc.parallelize(array)
df = rdd.toDF('int')
# +-----+
# |value|
# +-----+
# | 1|
# | 5|
# | 7|
# | 2|
# | 3|
# +-----+
Read data from MySQL
$ pyspark --packages "mysql:mysql-connector-java:5.1.41"
url = 'jdbc:mysql://127.0.0.1:3306/albedo'
properties = {'user': 'root', 'password': '123'}
spark.read.jdbc(url, 'app_repostarring', properties=properties)
ref:
https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-configuration-properties.html
Write data to MySQL
CREATE TABLE `recommender_songrecommend` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` int(11) NOT NULL,
`song_id` int(11) NOT NULL,
`score` double NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1;
matrix = [
(1, 2, 0.1),
(3, 4, 0.23456),
(5, 6, 7.89),
(7, 8, -10.111213),
]
df = spark.createDataFrame(matrix, ['user_id','song_id', 'score'])
url = 'jdbc:mysql://192.168.11.200:3306/DATABASE?user=USER&password=PASSWORD&verifyServerCertificate=false&useSSL=false&rewriteBatchedStatements=true'
properties = {'driver': 'com.mysql.jdbc.Driver'}
df \
.selectExpr('user AS user_id', 'item AS song_id', 'score') \
.write.jdbc(url, table='recommender_songrecommend', mode='append', properties=properties)
ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.jdbc
https://stackoverflow.com/questions/2993251/jdbc-batch-insert-performance/10617768#10617768
Read data from SQLite
$ pyspark --packages "org.xerial:sqlite-jdbc:3.16.1"
from pyspark.sql.functions import lit
from pyspark.sql.types import IntegerType, StructField, StructType, TimestampType
props = {'driver': 'org.sqlite.JDBC', 'date_string_format': 'yyyy-MM-dd HH:mm:ss'}
df = spark.read.jdbc("jdbc:sqlite:db.sqlite3", "app_repostarring", properties=props)
df = df.where(df.stargazers_count >= min_stargazers_count)
df = df.select('from_user_id', 'repo_id', 'created_at')
df = df.toDF('user', 'item')
df = df.withColumn('rating', lit(1))
schema = StructType([
StructField('user', IntegerType(), nullable=False),
StructField('item', IntegerType(), nullable=False),
StructField('rating', IntegerType(), nullable=False),
StructField('item_created_at', TimestampType(), nullable=False),
])
df = spark.createDataFrame(df.rdd, schema)
ref:
https://github.com/xerial/sqlite-jdbc
java.text.ParseException: Unparseable date: "2016-04-22 17:26:54"
https://github.com/xerial/sqlite-jdbc/issues/88
Read data from parquet
from pyspark.sql.utils import AnalysisException
raw_df_filepath = 'raw_df.parquet'
try:
raw_df = spark.read.format('parquet').load(raw_df_filepath)
except AnalysisException as exc:
if 'Path does not exist' in exc.desc:
raw_df = load_raw_data()
raw_df.write.format('parquet').save(raw_df_filepath)
else:
raise exc
ref:
https://community.hortonworks.com/articles/21303/write-read-parquet-file-in-spark.html
Create a DataFrame
matrix = [
(1, 1, 1),
(1, 2, 1),
(2, 1, 0),
(3, 1, 1),
(3, 3, 1),
(3, 4, 1),
(4, 1, 0),
(4, 2, 0),
(5, 9, 1),
(5, 5, 0),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'rating'])
Create a DataFrame with explicit schema
from pyspark.sql.types import *
matrix = [
('Alice', 0.5, 5.0),
('Bob', 0.2, 92.0),
('Tom', 0.0, 122.0),
('Zack', 0.1, 1.0),
]
schema = StructType([
StructField('name', StringType(), nullable=False),
StructField('prediction', DoubleType(), nullable=False),
StructField('rating', DoubleType(), nullable=False)
])
df = spark.createDataFrame(matrix, schema)
df.printSchema()
new_df = spark.createDataFrame(someRDD, df.schema)
ref:
https://github.com/awesome-spark/spark-gotchas/blob/master/05_spark_sql_and_dataset_api.md
Create a nested schema
from pyspark.sql.types import ArrayType
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
recommendation_schema = StructType([
StructField('item', IntegerType(), nullable=False),
StructField('rating', FloatType(), nullable=False),
])
user_recommendations_schema = StructType([
StructField('user', IntegerType(), nullable=False),
StructField('recommendations', ArrayType(recommendation_schema), nullable=False),
])
matrix = [
(1, [[100, 1.0], [200, 2.0]]),
(2, [[300, 3.0]]),
]
df = spark.createDataFrame(matrix, user_recommendations_schema)
df.printSchema()
# root
# |-- user: integer (nullable = false)
# |-- recommendations: array (nullable = false)
# | |-- element: struct (containsNull = true)
# | | |-- item: integer (nullable = false)
# | | |-- rating: float (nullable = false)
# |-- recommendations_count: integer (nullable = false)
df.show()
# +----+--------------------+---------------------+
# |user| recommendations|recommendations_count|
# +----+--------------------+---------------------+
# | 1|[[100,1.0], [200,...| 2|
# | 2| [[300,3.0]]| 1|
# +----+--------------------+---------------------+
ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types
Change schema of a DataFrame
from pyspark.sql.types import DoubleType, IntegerType, StructField, StructType
schema = StructType([
StructField('user', IntegerType(), nullable=False),
StructField('item', IntegerType(), nullable=False),
StructField('rating', DoubleType(), nullable=False),
])
df = spark.createDataFrame(df.rdd, schema)
Get numbers of partitions
df.rdd.getNumPartitions()
Split a DataFrame into chunks (partitions)
def write_to_db(partial_rows):
values_tuples = [(row.user, row.item, row.score, year, week) for row in chunk_rows]
result = your_mysql_insert_func(values_tuples)
return [result, ]
save_result = df \
.repartition(400) \
.rdd \
.mapPartitions(write_to_db) \
.collect()
ref:
https://stackoverflow.com/questions/24898700/batching-within-an-apache-spark-rdd-map
https://stackoverflow.com/questions/35370826/using-spark-for-sequential-row-by-row-processing-without-map-and-reduce
Show a DataFrame
# print the schema in a tree format
df.printSchema()
# print top 20 rows
df.show()
# print top 100 rows
df.show(100)
# calculate the descriptive statistics
df.describe().show()
df.describe(['like_count', ]).show()
Create a column with a literal value
from pyspark.sql.functions import lit
df = df.withColumn('like', lit(1))
Return a fraction of a DataFrame
fraction = {
'u1': 0.5,
'u2': 0.5,
'u3': 0.5,
'u4': 0.5,
'u5': 0.5,
}
df.sampleBy('user', fraction).show()
# +----+----+----------+
# |user|item|prediction|
# +----+----+----------+
# | u1| i1| 1|
# | u3| i4| 4|
# | u4| i1| 1|
# | u4| i2| 3|
# | u5| i5| 5|
# +----+----+----------+
Show distinct values of a column
df.select('user').distinct().show()
ref:
https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/
Rename columns
df.printSchema()
# root
# |-- from_user_id: integer (nullable = true)
# |-- repo_id: integer (nullable = true)
df = df.toDF('user', 'item')
# or
df = df.withColumnRenamed('from_user_id', 'user').withColumnRenamed('repo_id', 'item')
Convert a column to double type
df = df.withColumn('prediction', df['prediction'].cast('double'))
ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.cast
Update a colume based on conditions
from pyspark.sql.functions import when
df = df.withColumn('label', when(df['prediction'] > 0.5, 1).otherwise(0))
ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.when
http://stackoverflow.com/questions/34908448/spark-add-column-to-dataframe-conditionally
Drop columns from a DataFrame
predictions = predictions.dropna(subset=['prediction', S])
ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropna
DataFrame subtract another DataFrame
matrix1 = [
(1, 2, 123),
(3, 4, 1),
(5, 6, 45),
(7, 8, 3424),
]
df1 = spark.createDataFrame(matrix1, ['user','item', 'play_count'])
matrix2 = [
(3, 4),
(5, 6),
(9, 1),
(7, 8),
(1, 6),
(1, 1),
]
df2 = spark.createDataFrame(matrix2, ['user','item'])
df2.subtract(df1.select('user', 'item')).show()
# +----+----+
# |user|item|
# +----+----+
# | 1| 1|
# | 1| 6|
# | 9| 1|
# +----+----+
# or
testing_rdd = df.rdd.subtract(training.rdd)
testing = spark.createDataFrame(testing_rdd, df.schema)
ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.subtract
Convert a DataFrame column into a Python list
# list
popluar_items = [row['item'] for row in popular_df.select('item').collect()]
# set
all_items = {row.id for row in als_model.itemFactors.select('id').collect()}
Concatenate (merge) two DataFrames
full_df = df1.union(df2)
Convert a DataFrame to a Python dict
d = df.rdd.collectAsMap()
d['some_key']
Compute (approximate or exact) median of a numerical column
approximate_median = df.approxQuantile('count', [0.5, ], 0.25)
exact_median = df.approxQuantile('count', [0.5, ], 0.0)
maximum = df.approxQuantile('count', [1.0, ], 0.1)
minimum = df.approxQuantile('count', [0.0, ], 0.1)
ref:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.approxQuantile
Find frequent items for columns
df = rating_df.freqItems(['item', ], support=0.01)
# +--------------------+
# | item_freqItems|
# +--------------------+
# |[194512, 371798, ...|
# +--------------------+
# get the value of a DataFrame column
popular_items = df.collect()[0].item_freqItems
ref:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.freqItems
Broadcast a value
bc_candidates = sc.broadcast(set([1, 2, 4, 5, 8]))
print(bc_candidates.value)
# {8, 1, 2, 4, 5}
bc_df = sc.broadcast(df.collect())
df = spark.createDataFrame(bc_df.value)
ref:
http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
Broadcast a DataFrame in join
import pyspark.sql.functions as F
large_df.join(F.broadcast(small_df), 'some_key')
ref:
http://stackoverflow.com/questions/34053302/pyspark-and-broadcast-join-example
https://chapeau.freevariable.com/2014/09/improving-spark-application-performance.html
Cache a DataFrame
df.cache()
ref:
http://stackoverflow.com/questions/38056774/spark-cache-vs-broadcast
Show query execution plan
df.explain(extended=True)
Use SQL to query a DataFrame
props = {'driver': 'org.sqlite.JDBC', 'date_string_format': 'yyyy-MM-dd HH:mm:ss'}
df = spark.read.jdbc("jdbc:sqlite:db.sqlite3", "app_repostarring", properties=props)
df.createOrReplaceTempView('repo_stars')
query = 'SELECT DISTINCT repo_id AS item FROM repo_stars WHERE stargazers_count > 1000'
df2 = spark.sql(query)
df2.show()
query = """
SELECT
from_user_id AS user,
count(repo_id) AS count
FROM repo_stars
GROUP BY from_user_id
ORDER BY count DESC
"""
df = spark.sql(query)
params = {'top_n': top_n}
query = """
SELECT
repo_id AS item,
MAX(stargazers_count) AS stars
FROM repo_stars
GROUP BY repo_id
ORDER BY stars DESC
LIMIT {top_n}
""".format(**params)
popular_df = spark.sql(query)
ref:
https://sparkour.urizone.net/recipes/using-sql-udf/
WHERE ... IN ...
from pyspark.sql.functions import col
item_ids = [1, 2, 5, 8]
raw_df \
.where(col('repo_id').isin(item_ids)) \
.select('repo_url') \
.collectAsMap()
ORDER BY multiple columns
import pyspark.sql.functions as F
rating_df = raw_df \
.selectExpr('from_user_id AS user', 'repo_id AS item', '1 AS rating', 'starred_at') \
.orderBy('user', F.col('starred_at').desc())
Aggregate
df.agg(min('user'), max('user'), min('item'), max('item')).show()
max_value = user_star_count_df.agg(F.max('stars')).collect()[0]["max('stars')"]
SELECT COUNT(DISTINCT xxx) ...
import pyspark.sql.functions as F
matrix = [
(1, 1, 1),
(1, 2, 1),
(2, 1, 0),
(3, 1, 1),
(3, 3, 1),
(3, 4, 1),
(4, 1, 0),
(4, 2, 0),
(5, 9, 1),
(5, 5, 0),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'rating'])
df.agg(F.countDistinct('user')).show()
# +--------------------+
# |count(DISTINCT user)|
# +--------------------+
# | 5|
# +--------------------+
ref:
http://stackoverflow.com/questions/40888946/spark-dataframe-count-distinct-values-of-every-column
SELECT MAX(xxx) ... GROUP BY
df.groupBy('user').count().filter('count >= 4').show()
popular_df = raw_df \
.groupBy('repo_id') \
.agg(F.max('stargazers_count').alias('stars')) \
.orderBy('stars', ascending=False) \
.limit(top_n)
ref:
http://stackoverflow.com/questions/30616380/spark-how-to-count-number-of-records-by-key
SELECT COUNT() ... GROUP BY
prediction_df.groupBy('user').count().show()
# +------+-----+
# | user|count|
# +------+-----+
# |649661| 15|
# |464340| 15|
# |468780| 15|
# |489233| 11|
# |455143| 14|
# +------+-----+
stargazers_count_df = rating_df \
.groupBy('item') \
.agg(F.count('user').alias('stargazers_count')) \
.orderBy('stargazers_count', ascending=False)
# +--------+----------------+
# | item|stargazers_count|
# +--------+----------------+
# | 3544424| 137|
# | 2126244| 120|
# | 7691631| 115|
# | 1362490| 112|
# | 943149| 112|
# +--------+----------------+
starred_count_df = rating_df \
.groupBy('user') \
.agg(F.count('item').alias('starred_count')) \
.orderBy('starred_count', ascending=False)
# +-------+-------------+
# | user|starred_count|
# +-------+-------------+
# | 48936| 7201|
# |4560482| 5898|
# |3382565| 3978|
# | 10652| 3586|
# | 31996| 3459|
# +-------+-------------+
You may want to use approx_count_distinct
.
GROUP_CONCAT a column
from pyspark.sql.functions import expr
per_user_predictions_df = output_df \
.orderBy(['user', 'prediction'], ascending=False) \
.groupBy('user') \
.agg(expr('collect_list(item) as items'))
# +--------+--------------------+
# | user| items|
# +--------+--------------------+
# | 2142|[36560369, 197450...|
# | 47217|[40693501, 643554...|
# +--------+--------------------+
GROUP_CONCAT multiple columns
from pyspark.sql.functions import col, collect_list, struct
matrix = [
(1, 1, 0.1),
(1, 2, 5.1),
(1, 6, 0.0),
(2, 6, 9.3),
(3, 1, 0.54),
(3, 5, 0.83),
(4, 1, 0.65),
(4, 4, 1.023),
]
df = spark.createDataFrame(matrix, ['user', 'item', 'prediction'])
df \
.groupBy("user") \
.agg(collect_list(struct(col('item'), col('prediction'))).alias("recommendations")) \
.show(truncate=False)
# +----+---------------------------+
# |user|recommendations |
# +----+---------------------------+
# |1 |[[1,0.1], [2,5.1], [6,0.0]]|
# |3 |[[1,0.54], [5,0.83]] |
# |2 |[[6,9.3]] |
# |4 |[[1,0.65], [4,1.023]] |
# +----+---------------------------+
SELECT ... RANK() OVER (PARTITION BY ... ORDER BY)
from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import expr
import pyspark.sql.functions as F
window_spec = Window.partitionBy('from_user_id').orderBy(col('starred_at').desc())
per_user_actual_items_df = raw_df \
.select('from_user_id', 'repo_id', 'starred_at', F.rank().over(window_spec).alias('rank')) \
.where('rank <= 10') \
.groupBy('from_user_id') \
.agg(expr('collect_list(repo_id) as items')) \
.withColumnRenamed('from_user_id', 'user')
# +--------+--------------------+
# | user| items|
# +--------+--------------------+
# | 2142|[29122050, 634846...|
# | 59990|[9820191, 8729416...|
# +--------+--------------------+
window_spec = Window.partitionBy('user').orderBy(col('prediction').desc())
per_user_predicted_items_df = output_df \
.select('user', 'item', 'prediction', F.rank().over(window_spec).alias('rank')) \
.where('rank <= 10') \
.groupBy('user') \
.agg(expr('collect_list(item) as items'))
# +--------+--------------------+
# | user| items|
# +--------+--------------------+
# | 2142|[36560369, 197450...|
# | 47217|[40693501, 643554...|
# +--------+--------------------+
ref:
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
https://github.com/awesome-spark/spark-gotchas/blob/master/05_spark_sql_and_dataset_api.md#window-functions
Left anti join / Left excluding join
clean_df = rating_df.join(to_remove_items, 'item', 'left_anti')
ref:
https://www.codeproject.com/Articles/33052/Visual-Representation-of-SQL-Joins
Outer join
m1 = [
(1, 1, 1),
(1, 2, 1),
(1, 4, 1),
(2, 2, 1),
(2, 3, 1),
(3, 5, 1),
]
m1df = spark.createDataFrame(m1, ['user', 'item', 'rating'])
m1df = m1df.where('user = 1').alias('m1df')
m2 = [
(1, 100),
(2, 200),
(3, 300),
(4, 400),
(5, 500),
(6, 600),
]
m2df = spark.createDataFrame(m2, ['item', 'count'])
m2df = m2df.alias('m2df')
m1df.join(m2df, m1df.item == m2df.item, 'rightouter') \
.where('m1df.user IS NULL') \
.orderBy('m2df.count', ascending=False) \
.selectExpr('1 AS user', 'm2df.item', '0 AS rating') \
.show()
# +----+----+------+
# |user|item|rating|
# +----+----+------+
# | 1| 6| 0|
# | 1| 5| 0|
# | 1| 3| 0|
# +----+----+------+
ref:
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-sql-joins.html
Cross join
df = m1df.select('user').distinct().crossJoin(m2df.select('item'))
df.show()
query = """
SELECT f1.user, f2.item, 0 AS rating
FROM f1
CROSS JOIN f2
"""
df = spark.sql(query)
df.show()
all_user_item_pair_df = als_model.userFactors.selectExpr('id AS user') \
.crossJoin(alsModel.itemFactors.selectExpr('id AS item'))
# +----+----+
# |user|item|
# +----+----+
# |xxxx|oooo|
# +----+----+
ref:
http://stackoverflow.com/questions/5464131/finding-pairs-that-do-not-exist-in-a-different-table