{"id":382,"date":"2017-05-10T19:34:32","date_gmt":"2017-05-10T11:34:32","guid":{"rendered":"http:\/\/vinta.ws\/code\/?p=382"},"modified":"2026-02-18T01:20:35","modified_gmt":"2026-02-17T17:20:35","slug":"spark-sql-cookbook-pyspark","status":"publish","type":"post","link":"https:\/\/vinta.ws\/code\/spark-sql-cookbook-pyspark.html","title":{"rendered":"Spark SQL cookbook (Python)"},"content":{"rendered":"<h2>Access SparkSession<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">from pyspark.sql import SparkSession\n\n# get the default SparkSession instance\nspark = SparkSession.builder.getOrCreate()\n\nsc = spark.sparkContext\nsc.setLogLevel('INFO')<\/code><\/pre>\n<h2>Access custom configurations<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-bash\">$ spark-submit \n--master spark:\/\/localhost:7077 \n--properties-file my-properties.conf \nyour_spark_app.py\n\n# in my-properties.conf\n# spark.myapp.db_host 192.168.100.10<\/code><\/pre>\n<pre class=\"line-numbers\"><code class=\"language-py\">from pyspark.sql import SparkSession\n\nspark = SparkSession.builder.getOrCreate()\nsc = spark.sparkContext\nprint(sc.getConf().get('spark.myapp.db_host'))<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/31115881\/how-to-load-java-properties-file-and-use-in-spark\">https:\/\/stackoverflow.com\/questions\/31115881\/how-to-load-java-properties-file-and-use-in-spark<\/a><\/p>\n<h2>Create a RDD<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">array = [\n    1,\n    5,\n    7,\n    2,\n    3,\n]\nrdd = sc.parallelize(array)\ndf = rdd.toDF('int')\n# +-----+\n# |value|\n# +-----+\n# |    1|\n# |    5|\n# |    7|\n# |    2|\n# |    3|\n# +-----+<\/code><\/pre>\n<h2>Read data from MySQL<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-bash\">$ pyspark --packages \"mysql:mysql-connector-java:5.1.41\"<\/code><\/pre>\n<pre class=\"line-numbers\"><code class=\"language-py\">url = 'jdbc:mysql:\/\/127.0.0.1:3306\/albedo'\nproperties = {'user': 'root', 'password': '123'}\nspark.read.jdbc(url, 'app_repostarring', properties=properties)<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/spark.apache.org\/docs\/latest\/sql-programming-guide.html#jdbc-to-other-databases\">https:\/\/spark.apache.org\/docs\/latest\/sql-programming-guide.html#jdbc-to-other-databases<\/a><br \/>\n<a href=\"https:\/\/dev.mysql.com\/doc\/connector-j\/5.1\/en\/connector-j-reference-configuration-properties.html\">https:\/\/dev.mysql.com\/doc\/connector-j\/5.1\/en\/connector-j-reference-configuration-properties.html<\/a><\/p>\n<h2>Write data to MySQL<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-sql\">CREATE TABLE <code>recommender_songrecommend<\/code> (\n  <code>id<\/code> int(11) NOT NULL AUTO_INCREMENT,\n  <code>user_id<\/code> int(11) NOT NULL,\n  <code>song_id<\/code> int(11) NOT NULL,\n  <code>score<\/code> double NOT NULL,\n  PRIMARY KEY (<code>id<\/code>)\n) ENGINE=InnoDB AUTO_INCREMENT=1;<\/code><\/pre>\n<pre class=\"line-numbers\"><code class=\"language-py\">matrix = [\n    (1, 2, 0.1),\n    (3, 4, 0.23456),\n    (5, 6, 7.89),\n    (7, 8, -10.111213),\n]\ndf = spark.createDataFrame(matrix, ['user_id','song_id', 'score'])\n\nurl = 'jdbc:mysql:\/\/192.168.11.200:3306\/DATABASE?user=USER&amp;password=PASSWORD&amp;verifyServerCertificate=false&amp;useSSL=false&amp;rewriteBatchedStatements=true'\nproperties = {'driver': 'com.mysql.jdbc.Driver'}\ndf \n    .selectExpr('user AS user_id', 'item AS song_id', 'score') \n    .write.jdbc(url, table='recommender_songrecommend', mode='append', properties=properties)<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#pyspark.sql.DataFrameWriter.jdbc\">https:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#pyspark.sql.DataFrameWriter.jdbc<\/a><br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/2993251\/jdbc-batch-insert-performance\/10617768#10617768\">https:\/\/stackoverflow.com\/questions\/2993251\/jdbc-batch-insert-performance\/10617768#10617768<\/a><\/p>\n<h2>Read data from SQLite<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-bash\">$ pyspark --packages \"org.xerial:sqlite-jdbc:3.16.1\"<\/code><\/pre>\n<pre class=\"line-numbers\"><code class=\"language-py\">from pyspark.sql.functions import lit\nfrom pyspark.sql.types import IntegerType, StructField, StructType, TimestampType\n\nprops = {'driver': 'org.sqlite.JDBC', 'date_string_format': 'yyyy-MM-dd HH:mm:ss'}\ndf = spark.read.jdbc(\"jdbc:sqlite:db.sqlite3\", \"app_repostarring\", properties=props)\n\ndf = df.where(df.stargazers_count &gt;= min_stargazers_count)\ndf = df.select('from_user_id', 'repo_id', 'created_at')\ndf = df.toDF('user', 'item') \ndf = df.withColumn('rating', lit(1))\n\nschema = StructType([\n    StructField('user', IntegerType(), nullable=False),\n    StructField('item', IntegerType(), nullable=False),\n    StructField('rating', IntegerType(), nullable=False),\n    StructField('item_created_at', TimestampType(), nullable=False),\n])\ndf = spark.createDataFrame(df.rdd, schema)<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/github.com\/xerial\/sqlite-jdbc\">https:\/\/github.com\/xerial\/sqlite-jdbc<\/a><\/p>\n<p>java.text.ParseException: Unparseable date: &quot;2016-04-22 17:26:54&quot;<br \/>\n<a href=\"https:\/\/github.com\/xerial\/sqlite-jdbc\/issues\/88\">https:\/\/github.com\/xerial\/sqlite-jdbc\/issues\/88<\/a><\/p>\n<h2>Read data from parquet<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">from pyspark.sql.utils import AnalysisException\n\nraw_df_filepath = 'raw_df.parquet'\n\ntry:\n    raw_df = spark.read.format('parquet').load(raw_df_filepath)\nexcept AnalysisException as exc:\n    if 'Path does not exist' in exc.desc:\n        raw_df = load_raw_data()\n        raw_df.write.format('parquet').save(raw_df_filepath)\n    else:\n        raise exc<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/community.hortonworks.com\/articles\/21303\/write-read-parquet-file-in-spark.html\">https:\/\/community.hortonworks.com\/articles\/21303\/write-read-parquet-file-in-spark.html<\/a><\/p>\n<h2>Create a DataFrame<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">matrix = [\n    (1, 1, 1),\n    (1, 2, 1),\n    (2, 1, 0),\n    (3, 1, 1),\n    (3, 3, 1),\n    (3, 4, 1),\n    (4, 1, 0),\n    (4, 2, 0),\n    (5, 9, 1),\n    (5, 5, 0),\n]\ndf = spark.createDataFrame(matrix, ['user', 'item', 'rating'])<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"http:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#pyspark.sql.SparkSession.createDataFrame\">http:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#pyspark.sql.SparkSession.createDataFrame<\/a><\/p>\n<h2>Create a DataFrame with explicit schema<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">from pyspark.sql.types import *\n\nmatrix = [\n    ('Alice', 0.5, 5.0),\n    ('Bob',   0.2, 92.0),\n    ('Tom',   0.0, 122.0),\n    ('Zack',  0.1, 1.0),\n]\nschema = StructType([\n    StructField('name', StringType(), nullable=False),\n    StructField('prediction', DoubleType(), nullable=False),\n    StructField('rating', DoubleType(), nullable=False)\n])\ndf = spark.createDataFrame(matrix, schema)\ndf.printSchema()\n\nnew_df = spark.createDataFrame(someRDD, df.schema)<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/github.com\/awesome-spark\/spark-gotchas\/blob\/master\/05_spark_sql_and_dataset_api.md\">https:\/\/github.com\/awesome-spark\/spark-gotchas\/blob\/master\/05_spark_sql_and_dataset_api.md<\/a><\/p>\n<h2>Create a nested schema<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">from pyspark.sql.types import ArrayType\nfrom pyspark.sql.types import FloatType\nfrom pyspark.sql.types import IntegerType\nfrom pyspark.sql.types import StructField\nfrom pyspark.sql.types import StructType\n\nrecommendation_schema = StructType([\n    StructField('item', IntegerType(), nullable=False),\n    StructField('rating', FloatType(), nullable=False),\n])\n\nuser_recommendations_schema = StructType([\n    StructField('user', IntegerType(), nullable=False),\n    StructField('recommendations', ArrayType(recommendation_schema), nullable=False),\n])\n\nmatrix = [\n    (1, [[100, 1.0], [200, 2.0]]),\n    (2, [[300, 3.0]]),\n]\ndf = spark.createDataFrame(matrix, user_recommendations_schema)\n\ndf.printSchema()\n# root\n # |-- user: integer (nullable = false)\n # |-- recommendations: array (nullable = false)\n # |    |-- element: struct (containsNull = true)\n # |    |    |-- item: integer (nullable = false)\n # |    |    |-- rating: float (nullable = false)\n # |-- recommendations_count: integer (nullable = false)\n\ndf.show()\n# +----+--------------------+---------------------+\n# |user|     recommendations|recommendations_count|\n# +----+--------------------+---------------------+\n# |   1|[[100,1.0], [200,...|                    2|\n# |   2|         [[300,3.0]]|                    1|\n# +----+--------------------+---------------------+<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#module-pyspark.sql.types\">https:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#module-pyspark.sql.types<\/a><\/p>\n<h2>Change schema of a DataFrame<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">from pyspark.sql.types import DoubleType, IntegerType, StructField, StructType\n\nschema = StructType([\n    StructField('user', IntegerType(), nullable=False),\n    StructField('item', IntegerType(), nullable=False),\n    StructField('rating', DoubleType(), nullable=False),\n])\ndf = spark.createDataFrame(df.rdd, schema)<\/code><\/pre>\n<h2>Get numbers of partitions<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">df.rdd.getNumPartitions()<\/code><\/pre>\n<h2>Split a DataFrame into chunks (partitions)<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">def write_to_db(partial_rows):\n    values_tuples = [(row.user, row.item, row.score, year, week) for row in chunk_rows]\n    result = your_mysql_insert_func(values_tuples)\n    return [result, ]\n\nsave_result = df \n    .repartition(400) \n    .rdd \n    .mapPartitions(write_to_db) \n    .collect()<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/24898700\/batching-within-an-apache-spark-rdd-map\">https:\/\/stackoverflow.com\/questions\/24898700\/batching-within-an-apache-spark-rdd-map<\/a><br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/35370826\/using-spark-for-sequential-row-by-row-processing-without-map-and-reduce\">https:\/\/stackoverflow.com\/questions\/35370826\/using-spark-for-sequential-row-by-row-processing-without-map-and-reduce<\/a><\/p>\n<h2>Show a DataFrame<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\"># print the schema in a tree format\ndf.printSchema()\n\n# print top 20 rows\ndf.show()\n\n# print top 100 rows\ndf.show(100)\n\n# calculate the descriptive statistics\ndf.describe().show()\ndf.describe(['like_count', ]).show()<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"http:\/\/spark.apache.org\/docs\/latest\/sql-programming-guide.html#untyped-dataset-operations-aka-dataframe-operations\">http:\/\/spark.apache.org\/docs\/latest\/sql-programming-guide.html#untyped-dataset-operations-aka-dataframe-operations<\/a><\/p>\n<h2>Create a column with a literal value<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">from pyspark.sql.functions import lit\n\ndf = df.withColumn('like', lit(1))<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"http:\/\/stackoverflow.com\/questions\/33681487\/how-do-i-add-a-new-column-to-a-spark-dataframe-using-pyspark\">http:\/\/stackoverflow.com\/questions\/33681487\/how-do-i-add-a-new-column-to-a-spark-dataframe-using-pyspark<\/a><\/p>\n<h2>Return a fraction of a DataFrame<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">fraction = {\n    'u1': 0.5,\n    'u2': 0.5,\n    'u3': 0.5,\n    'u4': 0.5,\n    'u5': 0.5,\n}\ndf.sampleBy('user', fraction).show()\n# +----+----+----------+\n# |user|item|prediction|\n# +----+----+----------+\n# |  u1|  i1|         1|\n# |  u3|  i4|         4|\n# |  u4|  i1|         1|\n# |  u4|  i2|         3|\n# |  u5|  i5|         5|\n# +----+----+----------+<\/code><\/pre>\n<h2>Show distinct values of a column<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">df.select('user').distinct().show()<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/www.analyticsvidhya.com\/blog\/2016\/10\/spark-dataframe-and-operations\/\">https:\/\/www.analyticsvidhya.com\/blog\/2016\/10\/spark-dataframe-and-operations\/<\/a><\/p>\n<h2>Rename columns<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">df.printSchema()\n# root\n# |-- from_user_id: integer (nullable = true)\n # |-- repo_id: integer (nullable = true)\n\ndf = df.toDF('user', 'item')\n# or\ndf = df.withColumnRenamed('from_user_id', 'user').withColumnRenamed('repo_id', 'item')<\/code><\/pre>\n<h2>Convert a column to double type<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">df = df.withColumn('prediction', df['prediction'].cast('double'))<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"http:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#pyspark.sql.Column.cast\">http:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#pyspark.sql.Column.cast<\/a><\/p>\n<h2>Update a colume based on conditions<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">from pyspark.sql.functions import when\n\ndf = df.withColumn('label', when(df['prediction'] &gt; 0.5, 1).otherwise(0))<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"http:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#pyspark.sql.functions.when\">http:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#pyspark.sql.functions.when<\/a><br \/>\n<a href=\"http:\/\/stackoverflow.com\/questions\/34908448\/spark-add-column-to-dataframe-conditionally\">http:\/\/stackoverflow.com\/questions\/34908448\/spark-add-column-to-dataframe-conditionally<\/a><\/p>\n<h2>Drop columns from a DataFrame<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">predictions = predictions.dropna(subset=['prediction', S])<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"http:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#pyspark.sql.DataFrame.dropna\">http:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#pyspark.sql.DataFrame.dropna<\/a><\/p>\n<h2>DataFrame subtract another DataFrame<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">matrix1 = [\n    (1, 2, 123),\n    (3, 4, 1),\n    (5, 6, 45),\n    (7, 8, 3424),\n]\ndf1 = spark.createDataFrame(matrix1, ['user','item', 'play_count'])\n\nmatrix2 = [\n    (3, 4),\n    (5, 6),\n    (9, 1),\n    (7, 8),\n    (1, 6),\n    (1, 1),\n]\ndf2 = spark.createDataFrame(matrix2, ['user','item'])\n\ndf2.subtract(df1.select('user', 'item')).show()\n# +----+----+\n# |user|item|\n# +----+----+\n# |   1|   1|\n# |   1|   6|\n# |   9|   1|\n# +----+----+\n\n# or\n\ntesting_rdd = df.rdd.subtract(training.rdd)\ntesting = spark.createDataFrame(testing_rdd, df.schema)<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#pyspark.sql.DataFrame.subtract\">https:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#pyspark.sql.DataFrame.subtract<\/a><\/p>\n<h2>Convert a DataFrame column into a Python list<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\"># list\npopluar_items = [row['item'] for row in popular_df.select('item').collect()]\n\n# set\nall_items = {row.id for row in als_model.itemFactors.select('id').collect()}<\/code><\/pre>\n<h2>Concatenate (merge) two DataFrames<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">full_df = df1.union(df2)<\/code><\/pre>\n<h2>Convert a DataFrame to a Python dict<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">d = df.rdd.collectAsMap()\nd['some_key']<\/code><\/pre>\n<h2>Compute (approximate or exact) median of a numerical column<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">approximate_median = df.approxQuantile('count', [0.5, ], 0.25)\nexact_median = df.approxQuantile('count', [0.5, ], 0.0)\n\nmaximum = df.approxQuantile('count', [1.0, ], 0.1)\nminimum = df.approxQuantile('count', [0.0, ], 0.1)<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"http:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#pyspark.sql.DataFrame.approxQuantile\">http:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#pyspark.sql.DataFrame.approxQuantile<\/a><\/p>\n<h2>Find frequent items for columns<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">df = rating_df.freqItems(['item', ], support=0.01)\n# +--------------------+\n# |      item_freqItems|\n# +--------------------+\n# |[194512, 371798, ...|\n# +--------------------+\n\n# get the value of a DataFrame column\npopular_items = df.collect()[0].item_freqItems<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#pyspark.sql.DataFrame.freqItems\">https:\/\/spark.apache.org\/docs\/latest\/api\/python\/pyspark.sql.html#pyspark.sql.DataFrame.freqItems<\/a><\/p>\n<h2>Broadcast a value<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">bc_candidates = sc.broadcast(set([1, 2, 4, 5, 8]))\nprint(bc_candidates.value)\n# {8, 1, 2, 4, 5}\n\nbc_df = sc.broadcast(df.collect())\ndf = spark.createDataFrame(bc_df.value)<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"http:\/\/spark.apache.org\/docs\/latest\/programming-guide.html#broadcast-variables\">http:\/\/spark.apache.org\/docs\/latest\/programming-guide.html#broadcast-variables<\/a><\/p>\n<h2>Broadcast a DataFrame in join<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">import pyspark.sql.functions as F\n\nlarge_df.join(F.broadcast(small_df), 'some_key')<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"http:\/\/stackoverflow.com\/questions\/34053302\/pyspark-and-broadcast-join-example\">http:\/\/stackoverflow.com\/questions\/34053302\/pyspark-and-broadcast-join-example<\/a><br \/>\n<a href=\"https:\/\/chapeau.freevariable.com\/2014\/09\/improving-spark-application-performance.html\">https:\/\/chapeau.freevariable.com\/2014\/09\/improving-spark-application-performance.html<\/a><\/p>\n<h2>Cache a DataFrame<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">df.cache()<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"http:\/\/stackoverflow.com\/questions\/38056774\/spark-cache-vs-broadcast\">http:\/\/stackoverflow.com\/questions\/38056774\/spark-cache-vs-broadcast<\/a><\/p>\n<h2>Show query execution plan<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">df.explain(extended=True)<\/code><\/pre>\n<h2>Use SQL to query a DataFrame<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">props = {'driver': 'org.sqlite.JDBC', 'date_string_format': 'yyyy-MM-dd HH:mm:ss'}\ndf = spark.read.jdbc(\"jdbc:sqlite:db.sqlite3\", \"app_repostarring\", properties=props)\ndf.createOrReplaceTempView('repo_stars')\n\nquery = 'SELECT DISTINCT repo_id AS item FROM repo_stars WHERE stargazers_count &gt; 1000'\ndf2 = spark.sql(query)\ndf2.show()\n\nquery = \"\"\"\nSELECT\n    from_user_id AS user,\n    count(repo_id) AS count\nFROM repo_stars\nGROUP BY from_user_id\nORDER BY count DESC\n\"\"\"\ndf = spark.sql(query)\n\nparams = {'top_n': top_n}\nquery = \"\"\"\nSELECT \n    repo_id AS item,\n    MAX(stargazers_count) AS stars\nFROM repo_stars\nGROUP BY repo_id\nORDER BY stars DESC\nLIMIT {top_n}\n\"\"\".format(**params)\npopular_df = spark.sql(query)<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/sparkour.urizone.net\/recipes\/using-sql-udf\/\">https:\/\/sparkour.urizone.net\/recipes\/using-sql-udf\/<\/a><\/p>\n<h2>WHERE ... IN ...<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">from pyspark.sql.functions import col\n\nitem_ids = [1, 2, 5, 8]\nraw_df \n    .where(col('repo_id').isin(item_ids)) \n    .select('repo_url') \n    .collectAsMap()<\/code><\/pre>\n<h2>ORDER BY multiple columns<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">import pyspark.sql.functions as F\n\nrating_df = raw_df \n    .selectExpr('from_user_id AS user', 'repo_id AS item', '1 AS rating', 'starred_at') \n    .orderBy('user', F.col('starred_at').desc())<\/code><\/pre>\n<h2>Aggregate<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">df.agg(min('user'), max('user'), min('item'), max('item')).show()\n\nmax_value = user_star_count_df.agg(F.max('stars')).collect()[0][\"max('stars')\"]<\/code><\/pre>\n<h2>SELECT COUNT(DISTINCT xxx) ...<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">import pyspark.sql.functions as F\n\nmatrix = [\n    (1, 1, 1),\n    (1, 2, 1),\n    (2, 1, 0),\n    (3, 1, 1),\n    (3, 3, 1),\n    (3, 4, 1),\n    (4, 1, 0),\n    (4, 2, 0),\n    (5, 9, 1),\n    (5, 5, 0),\n]\ndf = spark.createDataFrame(matrix, ['user', 'item', 'rating'])\ndf.agg(F.countDistinct('user')).show()\n# +--------------------+\n# |count(DISTINCT user)|\n# +--------------------+\n# |                   5|\n# +--------------------+<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"http:\/\/stackoverflow.com\/questions\/40888946\/spark-dataframe-count-distinct-values-of-every-column\">http:\/\/stackoverflow.com\/questions\/40888946\/spark-dataframe-count-distinct-values-of-every-column<\/a><\/p>\n<h2>SELECT MAX(xxx) ... GROUP BY<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">df.groupBy('user').count().filter('count &gt;= 4').show()\n\npopular_df = raw_df \n    .groupBy('repo_id') \n    .agg(F.max('stargazers_count').alias('stars')) \n    .orderBy('stars', ascending=False) \n    .limit(top_n)<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"http:\/\/stackoverflow.com\/questions\/30616380\/spark-how-to-count-number-of-records-by-key\">http:\/\/stackoverflow.com\/questions\/30616380\/spark-how-to-count-number-of-records-by-key<\/a><\/p>\n<h2>SELECT COUNT() ... GROUP BY<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">prediction_df.groupBy('user').count().show()\n# +------+-----+\n# |  user|count|\n# +------+-----+\n# |649661|   15|\n# |464340|   15|\n# |468780|   15|\n# |489233|   11|\n# |455143|   14|\n# +------+-----+\n\nstargazers_count_df = rating_df \n    .groupBy('item') \n    .agg(F.count('user').alias('stargazers_count')) \n    .orderBy('stargazers_count', ascending=False)\n# +--------+----------------+\n# |    item|stargazers_count|\n# +--------+----------------+\n# | 3544424|             137|\n# | 2126244|             120|\n# | 7691631|             115|\n# | 1362490|             112|\n# |  943149|             112|\n# +--------+----------------+\n\nstarred_count_df = rating_df \n    .groupBy('user') \n    .agg(F.count('item').alias('starred_count')) \n    .orderBy('starred_count', ascending=False)\n# +-------+-------------+\n# |   user|starred_count|\n# +-------+-------------+\n# |  48936|         7201|\n# |4560482|         5898|\n# |3382565|         3978|\n# |  10652|         3586|\n# |  31996|         3459|\n# +-------+-------------+<\/code><\/pre>\n<p>You may want to use <code>approx_count_distinct<\/code>.<\/p>\n<h2>GROUP_CONCAT a column<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">from pyspark.sql.functions import expr\n\nper_user_predictions_df = output_df \n  .orderBy(['user', 'prediction'], ascending=False) \n  .groupBy('user') \n  .agg(expr('collect_list(item) as items'))\n# +--------+--------------------+\n# |    user|               items|\n# +--------+--------------------+\n# |    2142|[36560369, 197450...|\n# |   47217|[40693501, 643554...|\n# +--------+--------------------+<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"http:\/\/stackoverflow.com\/questions\/31640729\/spark-sql-replacement-for-mysql-group-concat-aggregate-function\">http:\/\/stackoverflow.com\/questions\/31640729\/spark-sql-replacement-for-mysql-group-concat-aggregate-function<\/a><\/p>\n<h2>GROUP_CONCAT multiple columns<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">from pyspark.sql.functions import col, collect_list, struct\n\nmatrix = [\n    (1, 1, 0.1),\n    (1, 2, 5.1),\n    (1, 6, 0.0),\n    (2, 6, 9.3),\n    (3, 1, 0.54),\n    (3, 5, 0.83),\n    (4, 1, 0.65),\n    (4, 4, 1.023),\n]\ndf = spark.createDataFrame(matrix, ['user', 'item', 'prediction'])\n\ndf \n    .groupBy(\"user\") \n    .agg(collect_list(struct(col('item'), col('prediction'))).alias(\"recommendations\")) \n    .show(truncate=False)\n# +----+---------------------------+\n# |user|recommendations            |\n# +----+---------------------------+\n# |1   |[[1,0.1], [2,5.1], [6,0.0]]|\n# |3   |[[1,0.54], [5,0.83]]       |\n# |2   |[[6,9.3]]                  |\n# |4   |[[1,0.65], [4,1.023]]      |\n# +----+---------------------------+<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/37737843\/aggregating-multiple-columns-with-custom-function-in-spark\">https:\/\/stackoverflow.com\/questions\/37737843\/aggregating-multiple-columns-with-custom-function-in-spark<\/a><\/p>\n<h2>SELECT ... RANK() OVER (PARTITION BY ... ORDER BY)<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">from pyspark.sql import Window\nfrom pyspark.sql.functions import col\nfrom pyspark.sql.functions import expr\nimport pyspark.sql.functions as F\n\nwindow_spec = Window.partitionBy('from_user_id').orderBy(col('starred_at').desc())\nper_user_actual_items_df = raw_df \n    .select('from_user_id', 'repo_id', 'starred_at', F.rank().over(window_spec).alias('rank')) \n    .where('rank &lt;= 10') \n    .groupBy('from_user_id') \n    .agg(expr('collect_list(repo_id) as items')) \n    .withColumnRenamed('from_user_id', 'user')\n# +--------+--------------------+\n# |    user|               items|\n# +--------+--------------------+\n# |    2142|[29122050, 634846...|\n# |   59990|[9820191, 8729416...|\n# +--------+--------------------+\n\nwindow_spec = Window.partitionBy('user').orderBy(col('prediction').desc())\nper_user_predicted_items_df = output_df \n    .select('user', 'item', 'prediction', F.rank().over(window_spec).alias('rank')) \n    .where('rank &lt;= 10') \n    .groupBy('user') \n    .agg(expr('collect_list(item) as items'))\n# +--------+--------------------+\n# |    user|               items|\n# +--------+--------------------+\n# |    2142|[36560369, 197450...|\n# |   47217|[40693501, 643554...|\n# +--------+--------------------+<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/databricks.com\/blog\/2015\/07\/15\/introducing-window-functions-in-spark-sql.html\">https:\/\/databricks.com\/blog\/2015\/07\/15\/introducing-window-functions-in-spark-sql.html<\/a><br \/>\n<a href=\"https:\/\/github.com\/awesome-spark\/spark-gotchas\/blob\/master\/05_spark_sql_and_dataset_api.md#window-functions\">https:\/\/github.com\/awesome-spark\/spark-gotchas\/blob\/master\/05_spark_sql_and_dataset_api.md#window-functions<\/a><\/p>\n<h2>Left anti join \/ Left excluding join<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">clean_df = rating_df.join(to_remove_items, 'item', 'left_anti')<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/www.codeproject.com\/Articles\/33052\/Visual-Representation-of-SQL-Joins\">https:\/\/www.codeproject.com\/Articles\/33052\/Visual-Representation-of-SQL-Joins<\/a><\/p>\n<h2>Outer join<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">m1 = [\n    (1, 1, 1),\n    (1, 2, 1),\n    (1, 4, 1),\n    (2, 2, 1),\n    (2, 3, 1),\n    (3, 5, 1),\n]\nm1df = spark.createDataFrame(m1, ['user', 'item', 'rating'])\nm1df = m1df.where('user = 1').alias('m1df')\n\nm2 = [\n    (1, 100),\n    (2, 200),\n    (3, 300),\n    (4, 400),\n    (5, 500),\n    (6, 600),\n]\nm2df = spark.createDataFrame(m2, ['item', 'count'])\nm2df = m2df.alias('m2df')\n\nm1df.join(m2df, m1df.item == m2df.item, 'rightouter') \n.where('m1df.user IS NULL') \n.orderBy('m2df.count', ascending=False) \n.selectExpr('1 AS user', 'm2df.item', '0 AS rating') \n.show()\n# +----+----+------+\n# |user|item|rating|\n# +----+----+------+\n# |   1|   6|     0|\n# |   1|   5|     0|\n# |   1|   3|     0|\n# +----+----+------+<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/jaceklaskowski.gitbooks.io\/mastering-apache-spark\/content\/spark-sql-joins.html\">https:\/\/jaceklaskowski.gitbooks.io\/mastering-apache-spark\/content\/spark-sql-joins.html<\/a><\/p>\n<h2>Cross join<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-py\">df = m1df.select('user').distinct().crossJoin(m2df.select('item'))\ndf.show()\n\nquery = \"\"\"\nSELECT f1.user, f2.item, 0 AS rating\nFROM f1\nCROSS JOIN f2\n\"\"\"\ndf = spark.sql(query)\ndf.show()\n\nall_user_item_pair_df = als_model.userFactors.selectExpr('id AS user') \n    .crossJoin(alsModel.itemFactors.selectExpr('id AS item'))\n# +----+----+\n# |user|item|\n# +----+----+\n# |xxxx|oooo|\n# +----+----+<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"http:\/\/stackoverflow.com\/questions\/5464131\/finding-pairs-that-do-not-exist-in-a-different-table\">http:\/\/stackoverflow.com\/questions\/5464131\/finding-pairs-that-do-not-exist-in-a-different-table<\/a><\/p>\n","protected":false},"excerpt":{"rendered":"<p>Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed.<\/p>\n","protected":false},"author":1,"featured_media":383,"comment_status":"closed","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[97,112,4],"tags":[108,2],"class_list":["post-382","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-about-ai","category-about-big-data","category-about-python","tag-apache-spark","tag-python"],"_links":{"self":[{"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/posts\/382","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/comments?post=382"}],"version-history":[{"count":0,"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/posts\/382\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/media\/383"}],"wp:attachment":[{"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/media?parent=382"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/categories?post=382"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/tags?post=382"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}