{"id":429,"date":"2017-09-02T16:54:58","date_gmt":"2017-09-02T08:54:58","guid":{"rendered":"http:\/\/vinta.ws\/code\/?p=429"},"modified":"2026-03-17T01:27:42","modified_gmt":"2026-03-16T17:27:42","slug":"spark-sql-cookbook-scala","status":"publish","type":"post","link":"https:\/\/vinta.ws\/code\/spark-sql-cookbook-scala.html","title":{"rendered":"Spark SQL cookbook (Scala)"},"content":{"rendered":"<p>Scala is the first class citizen language for interacting with Apache Spark, but it's difficult to learn. This article is mostly about operating DataFrame or Dataset in Spark SQL.<\/p>\n<h2>Access SparkSession<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.SparkConf\nimport org.apache.spark.sql.SparkSession\n\nval conf = new SparkConf()\nconf.set(\"spark.serializer\", \"org.apache.spark.serializer.KryoSerializer\")\n\nimplicit val spark: SparkSession = SparkSession\n  .builder()\n  .appName(\"PersonalizedRanker\")\n  .config(conf)\n  .getOrCreate()\n\nval sc = spark.sparkContext\nsc.setLogLevel(\"WARN\")<\/code><\/pre>\n<p>Before Spark 2.0:<\/p>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.SparkConf\nimport org.apache.spark.SparkContext\n\nval conf: SparkConf = new SparkConf()\n  .setAppName(\"PersonalizedRanker\")\n  .setMaster(\"local[*]\")\n\nval sc: SparkContext = new SparkContext(conf)<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/sparkour.urizone.net\/recipes\/understanding-sparksession\/\">https:\/\/sparkour.urizone.net\/recipes\/understanding-sparksession\/<\/a><br \/>\n<a href=\"https:\/\/spark.apache.org\/docs\/latest\/configuration.html\">https:\/\/spark.apache.org\/docs\/latest\/configuration.html<\/a><\/p>\n<h2>Import Implicits<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">val spark = SparkSession.builder().getOrCreate()\n\nimport spark.implicits._\n\n\/\/ you could also access SparkSession via any Dataset or DataFrame\nimport someDS.sparkSession.implicits._\nimport someDF.sparkSession.implicits._\n\nval ratingDF = rawDF\n  .selectExpr(\"from_user_id AS user\", \"repo_id AS item\", \"1 AS rating\", \"starred_at\")\n  .orderBy($\"user\", $\"starred_at\".desc)<\/code><\/pre>\n<h2>Read Data from MySQL<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import java.util.Properties\n\nval dbUrl = \"jdbc:mysql:\/\/mysql:3306\/albedo?user=root&amp;password=123&amp;verifyServerCertificate=false&amp;useSSL=false\"\nval prop = new Properties()\nprop.put(\"driver\", \"com.mysql.jdbc.Driver\")\nval rawDF = spark.read.jdbc(dbUrl, \"app_repostarring\", prop)<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/docs.databricks.com\/spark\/latest\/data-sources\/sql-databases.html\">https:\/\/docs.databricks.com\/spark\/latest\/data-sources\/sql-databases.html<\/a><\/p>\n<h2>Read Data from an Apache Parquet File<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}\n\nimport spark.implicits._\n\ncase class RepoStarring(\n  user_id: Int,\n  repo_id: Int,\n  starred_at: java.sql.Timestamp,\n  starring: Double\n)\n\nval dataDir = \".\"\nval today = \"20170820\"\n\nval savePath = s\"$dataDir\/spark-data\/$today\/repoStarringDF.parquet\"\nval df: DataFrame = try {\n  spark.read.parquet(savePath)\n} catch {\n  case e: AnalysisException =&gt; {\n    if (e.getMessage().contains(\"Path does not exist\")) {\n      val df = spark.read.jdbc(dbUrl, \"app_repostarring\", props)\n        .select(\"user_id\", \"repo_id\", \"starred_at\")\n        .withColumn(\"starring\", lit(1.0))\n      df.write.mode(\"overwrite\").parquet(savePath)\n      df\n    } else {\n      throw e\n    }\n  }\n}\ndf.as[RepoStarring]<\/code><\/pre>\n<h2>Read Data from an Apache Avro File<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-bash\">$ spark-submit --packages \"com.databricks:spark-avro_2.11:3.2.0\"<\/code><\/pre>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import com.databricks.spark.avro._\nimport org.apache.spark.sql.SparkSession\n\nimplicit val spark = SparkSession\n  .builder()\n  .appName(\"Word2VecTrainer\")\n  .getOrCreate()\n\nval df = spark.read.avro(\".\/githubarchive_repo_info.avro\")\ndf.show()<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/github.com\/databricks\/spark-avro\">https:\/\/github.com\/databricks\/spark-avro<\/a><\/p>\n<h2>Create a RDD from a list of Certain Case Class<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.rdd.RDD\n\ncase class WikipediaArticle(title: String, text: String) {\n  def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)\n}\n\nval wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(\n  WikipediaArticle(\"a1\", \"abc Scala xyz\"),\n  WikipediaArticle(\"a2\", \"abc Python xyz\"),\n  WikipediaArticle(\"a3\", \"abc Scala xyz\"),\n  WikipediaArticle(\"a4\", \"abc Scala xyz\")\n))<\/code><\/pre>\n<h2>Get a Subset of a RDD<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">val subsetArr = someRDD.take(1000)\nvar subsetRDD = sc.parallelize(subsetArr)<\/code><\/pre>\n<h2>Create a DataFrame from a Seq<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.sql.functions._\n\nval df = spark.createDataFrame(Seq(\n    (1, 1, 1, \"2017-05-16 20:01:00.0\"),\n    (1, 2, 1, \"2017-05-17 21:01:00.0\"),\n    (2, 1, 0, \"2017-05-18 22:01:00.0\"),\n    (3, 1, 1, \"2017-05-10 22:01:00.0\")\n  ))\n  .toDF(\"user\", \"item\", \"star\", \"starred_at\")\n  .withColumn(\"starred_at\", unix_timestamp(col(\"starred_at\")).cast(\"timestamp\"))\n\/\/ you could override the schema\n\/\/ val df = spark.createDataFrame(df.rdd, starringSchema)\n\ncase class Starring(user: Int, item: Int, star: Int, starred_at: java.sql.Timestamp)\nval ds = df.as[Starring]\n\ndf.printSchema()\n\/\/ root\n\/\/ |-- user: integer (nullable = false)\n\/\/ |-- item: integer (nullable = false)\n\/\/ |-- star: integer (nullable = false)\n\/\/ |-- starred_at: timestamp (nullable = false)<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/medium.com\/@mrpowers\/manually-creating-spark-dataframes-b14dae906393\">https:\/\/medium.com\/@mrpowers\/manually-creating-spark-dataframes-b14dae906393<\/a><\/p>\n<h2>Create a Dataset<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import spark.implicits._\n\nval ds1 = spark.read.json(\"people.json\").as[Person]\nval ds2 = df.toDS\nval ds3 = rdd.toDS<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/www.coursera.org\/learn\/scala-spark-big-data\/lecture\/yrfPh\/datasets\">https:\/\/www.coursera.org\/learn\/scala-spark-big-data\/lecture\/yrfPh\/datasets<\/a><\/p>\n<h2>Statistics<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">repoInfoDS.count()\n\/\/ 16000\n\nrepoInfoDS\n  .describe(\"language\", \"size\", \"stargazers_count\", \"forks_count\", \"subscribers_count\", \"open_issues_count\")\n  .show()\n\/\/ +-------+--------+-----------------+-----------------+----------------+-----------------+-----------------+\n\/\/ |summary|language|             size| stargazers_count|     forks_count|subscribers_count|open_issues_count|\n\/\/ +-------+--------+-----------------+-----------------+----------------+-----------------+-----------------+\n\/\/ |  count|   16000|            16000|            16000|           16000|            16000|            16000|\n\/\/ |   mean|    null|    14777.8928125|      238.5754375|       61.164375|        20.802375|         9.896625|\n\/\/ | stddev|    null|133926.8365289536|1206.220336641683|394.950236056925|84.09955924587845|56.72585435688847|\n\/\/ |    min|   ANTLR|                0|                2|               0|                0|                0|\n\/\/ |    max|    XSLT|          9427027|            56966|           28338|             4430|             3503|\n\/\/ +-------+--------+-----------------+-----------------+----------------+-----------------+-----------------+\n\nrepoInfoDS.stat.corr(\"stargazers_count\", \"forks_count\")\n\/\/ 0.8408082958003276\n\nrepoInfoDS.stat.corr(\"stargazers_count\", \"size\")\n\/\/ 0.05351197356230549\n\nrepoInfoDS.stat.freqItems(Seq(\"language\"), 0.1).show(truncate=false)\n\/\/ +-----------------------------------------------------------+\n\/\/ |language_freqItems                                         |\n\/\/ +-----------------------------------------------------------+\n\/\/ |[Ruby, Objective-C, C, Go, JavaScript, Swift, Java, Python]|\n\/\/ +-----------------------------------------------------------+<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/databricks.com\/blog\/2015\/06\/02\/statistical-and-mathematical-functions-with-dataframes-in-spark.html\">https:\/\/databricks.com\/blog\/2015\/06\/02\/statistical-and-mathematical-functions-with-dataframes-in-spark.html<\/a><\/p>\n<h2>Generate a Schema from a List of Column Names<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.sql.types._\n\ndef generateSchema(columnNames: List[String]): StructType = {\n  val fields = columnNames.map({\n    case columnName: String if columnName == \"tucaseid\" =&gt; {\n      StructField(columnName, StringType, nullable = false)\n    }\n    case columnName =&gt; {\n      StructField(columnName, DoubleType, nullable = false)\n    }\n  })\n  StructType(fields.toList)\n}\n\nval columnNames = List(\"tucaseid\", \"gemetsta\", \"gtmetsta\", \"peeduca\", \"pehspnon\", \"ptdtrace\", \"teage\")\nval schema = generateSchema(columnNames)<\/code><\/pre>\n<h2>Generate a Schema from a Case Class<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import java.sql.Timestamp\nimport org.apache.spark.sql.Encoders\n\ncase class Starring(user: Int, item: Int, star: Int, starred_at: Timestamp)\nval starringSchema = Encoders.product[Starring].schema<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/36746055\/generate-a-spark-structtype-schema-from-a-case-class\">https:\/\/stackoverflow.com\/questions\/36746055\/generate-a-spark-structtype-schema-from-a-case-class<\/a><\/p>\n<h2>Change a DataFrame's Schema<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">val newDF = spark.createDataFrame(oldDF.rdd, starringSchema)<\/code><\/pre>\n<h2>Print the Vector Length of Specific Columns<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.ml.linalg.Vector\n\nuserProfileDF.select(\"languages_preferences_w2v\", \"clean_bio_w2v\")\n  .head\n  .toSeq.foreach((field: Any) =&gt; {\n    println(field.asInstanceOf[Vector].size)\n  })<\/code><\/pre>\n<h2>Convert a DataFrame into a Dataset<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import java.sql.Timestamp\nimport org.apache.spark.ml.feature.SQLTransformer\n\ncase class Starring(user: Int, item: Int, star: Int, starred_at: Timestamp)\n\nval starringBuilder = new SQLTransformer()\nval starringSQL = \"\"\"\nSELECT from_user_id AS user, repo_id AS item, 1 AS star, starred_at\nFROM __THIS__\nORDER BY user, starred_at DESC\n\"\"\"\nstarringBuilder.setStatement(starringSQL)\nval starringDS = starringBuilder.transform(rawDF).as[Starring]<\/code><\/pre>\n<h2>Convert a Dataset into a PairRDD<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">val pairRDD = dataset\n  .select(\"user\", \"item\")\n  .rdd\n  .map(row =&gt; (row(0), row(1)))\n\n\/\/ or\n\ncase class UserItemPair(user: Int, item: Int)\n\nval pairRDD = dataset\n  .select(\"user\", \"item\").as[UserItemPair]\n  .rdd\n  .map(row =&gt; (row.user, row.item))\n\n\/\/ or\n\nimport org.apache.spark.sql.Row\n\nval pairRDD = dataset\n  .select(\"user\", \"item\")\n  .rdd\n  .map({\n    case Row(user: Int, item: Int) =&gt; (user, item)\n  })<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/30655914\/map-rdd-to-pairrdd-in-scala\">https:\/\/stackoverflow.com\/questions\/30655914\/map-rdd-to-pairrdd-in-scala<\/a><\/p>\n<h2>Convert a Dataset into a List or Set<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">val popularRepos = popularReposDS\n  .select(\"item\")\n  .rdd\n  .map(r =&gt; r(0).asInstanceOf[Int])\n  .collect()\n  .to[List]\n\n\/\/ or\n\nval popularRepos = popularReposDS\n  .select($\"repo_id\".as[Int])\n  .collect()\n  .to[List]\n\n\/\/ List(98837133, 97071697, 17439026, ...)<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/32000646\/extract-column-values-of-dataframe-as-list-in-apache-spark\">https:\/\/stackoverflow.com\/questions\/32000646\/extract-column-values-of-dataframe-as-list-in-apache-spark<\/a><\/p>\n<h2>Add a Column to a DataFrame<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.sql.functions.lit\n\nval newDF = df.withColumn(\"starring\", lit(1))<\/code><\/pre>\n<h2>Repeat withColumn()<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.sql.DataFrame\n\nval booleanColumnNames = Array(\"fork\", \"has_issues\", \"has_projects\", \"has_downloads\", \"has_wiki\", \"has_pages\")\nval convertedRepoInfoDF = booleanColumnNames.foldLeft[DataFrame](cleanRepoInfoDF)(\n  (accDF, columnName) =&gt; accDF.withColumn(s\"clean_$columnName\", col(columnName).cast(\"int\"))\n)\n\n\/\/ or\n\nval lowerableColumnNames = Array(\"description\", \"language\", \"topics\")\nval booleanColumnNames = Array(\"fork\", \"has_issues\", \"has_projects\", \"has_downloads\", \"has_wiki\", \"has_pages\")\nval convertedRepoInfoDF = (lowerableColumnNames ++ booleanColumnNames).foldLeft[DataFrame](cleanRepoInfoDF)((accDF, columnName) =&gt; {\n  columnName match {\n    case _ if lowerableColumnNames.contains(columnName) =&gt; \n      accDF.withColumn(s\"clean_$columnName\", lower(col(columnName)))\n    case _ if booleanColumnNames.contains(columnName) =&gt; \n      accDF.withColumn(s\"clean_$columnName\", col(columnName).cast(\"int\"))\n  }\n})<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/41400504\/spark-scala-repeated-calls-to-withcolumn-using-the-same-function-on-multiple-c\">https:\/\/stackoverflow.com\/questions\/41400504\/spark-scala-repeated-calls-to-withcolumn-using-the-same-function-on-multiple-c<\/a><\/p>\n<h2>Specify that a Row has any Nullable Column<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">val nullableColumnNames = Array(\"bio\", \"blog\", \"company\", \"email\", \"location\", \"name\")\n\nval df2 = df1.withColumn(\"has_null\", when(nullableColumnNames.map(df1(_).isNull).reduce(_||_), true).otherwise(false))\n\/\/ or\nval df2 = df1.withColumn(\"has_null\", when($\"bio\".isNull or\n                                          $\"blog\".isNull or\n                                          $\"company\".isNull or\n                                          $\"email\".isNull or\n                                          $\"location\".isNull or\n                                          $\"name\".isNull, true).otherwise(false))<\/code><\/pre>\n<h2>Combine Two Columns into an Array<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.sql.functions._\n\nfixRepoInfoDS\n  .where(\"topics != ''\")\n  .withColumn(\"tags\", struct(\"language\", \"topics\"))\n  .select(\"tags\")\n  .show(truncate=false)\n\/\/ +----------------------------------------------------------------------+\n\/\/ |tags                                                                  |\n\/\/ +----------------------------------------------------------------------+\n\/\/ |[Ruby,aasm,activerecord,mongoid,ruby,state-machine,transition]        |\n\/\/ |[Ruby,captcha,rails,recaptcha,ruby,sinatra]                           |\n\/\/ |[Python,commandline,gtd,python,sqlite,todo]                           |\n\/\/ ...\n\/\/ +----------------------------------------------------------------------+<\/code><\/pre>\n<h2>Select Nested Column Directly<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">userRecommendationsDF\n+----+-----------------------------------------------------------------+\n|user|recommendations                                                  |\n+----+-----------------------------------------------------------------+\n|1   |[[2,0.9997897], [4,0.9988761], [1,0.9988487], [5,0.0]]           |\n|3   |[[5,0.9984464], [1,2.9802322E-8], [2,0.0], [4,0.0]]              |\n|2   |[[4,0.9921428], [2,0.10759391], [1,0.10749264], [5,1.4901161E-8]]|\n+----+-----------------------------------------------------------------+\n\nuserRecommendationsDF.printSchema()\n\/\/ root\n\/\/ |-- user: integer (nullable = false)\n\/\/ |-- recommendations: array (nullable = true)\n\/\/ |    |-- element: struct (containsNull = true)\n\/\/ |    |    |-- item: integer (nullable = true)\n\/\/ |    |    |-- rating: float (nullable = true)\n\nval userItemsDF = userRecommendationsDF.select($\"user\", $\"recommendations.item\")\n\/\/ +----+------------+\n\/\/ |user|        item|\n\/\/ +----+------------+\n\/\/ |   1|[2, 4, 1, 5]|\n\/\/ |   3|[5, 1, 2, 4]|\n\/\/ |   2|[4, 2, 1, 5]|\n\/\/ +----+------------+<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/38413040\/spark-sql-udf-with-complex-input-parameter\">https:\/\/stackoverflow.com\/questions\/38413040\/spark-sql-udf-with-complex-input-parameter<\/a><\/p>\n<h2>Flatten an Array Column<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.sql.Row\n\nval userRecommenderItemDF = alsModel.recommendForAllUsers(15)\n  .flatMap((row: Row) =&gt; {\n    val userID = row.getInt(0)\n    val recommendations = row.getSeq[Row](1)\n    recommendations.map {\n      case Row(repoID: Int, score: Float) =&gt; {\n        (userID, repoID, score, \"als\")\n      }\n    }\n  })\n  .toDF(\"user_id\", \"repo_id\", \"score\", \"source\")\n\nuserRecommenderItemDF.show()\n\/\/ +-------+-------+-------------+------+\n\/\/ |user_id|repo_id|score        |source|\n\/\/ +-------+-------+-------------+------+\n\/\/ |2142   |48     |0.05021151   |als   |\n\/\/ |2142   |118    |0.05021151   |als   |\n\/\/ |2142   |68     |0.7791124    |als   |\n\/\/ |2142   |98     |0.9939307    |als   |\n\/\/ |2142   |28     |0.53719014   |als   |\n\/\/ +-------+-------+-------------+------+<\/code><\/pre>\n<h2>Define Conditional Columns<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.sql.Column\nimport org.apache.spark.sql.functions._\n\nval workingStatusProjection: Column = when($\"telfs\" &gt;= 1 &amp;&amp; $\"telfs\" &lt; 3, \"working\").\n                                      otherwise(\"not working\").\n                                      as(\"working\")\n\nval ageProjection: Column = when($\"teage\" &gt;= 15 &amp;&amp; $\"teage\" &lt;= 22 , \"young\").\n                            when($\"teage\" &gt;= 23 &amp;&amp; $\"teage\" &lt;= 55 , \"active\").\n                            otherwise(\"elder\").\n                            as(\"age\")\n\nval workProjection: Column = workColumns.reduce(_+_).divide(60).as(\"work_hours\")\n\nval new DF = df.select(workingStatusProjection, ageProjection, workProjection)\n\/\/ +-----------+------+------------------+\n\/\/ |    working|   age|        work_hours|\n\/\/ +-----------+------+------------------+\n\/\/ |    working| elder|               0.0|\n\/\/ |    working|active|               0.0|\n\/\/ |    working|active|               0.0|\n\/\/ |not working|active|               2.0|\n\/\/ |    working|active| 8.583333333333334|\n\/\/ +-----------+------+------------------+<\/code><\/pre>\n<h2>Create a Custom Transformation Function for any DataFrame<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">def withGreeting(df: DataFrame): DataFrame = {\n  df.withColumn(\"greeting\", lit(\"hello world\"))\n}\n\ndef withCat(name: String)(df: DataFrame): DataFrame = {\n  df.withColumn(\"cat\", lit(s\"$name meow\"))\n}\n\nval df = Seq(\n  \"funny\",\n  \"person\"\n).toDF(\"something\")\n\nval weirdDf = df\n  .transform(withGreeting)\n  .transform(withCat(\"kitten\"))\n\/\/ +---------+-----------+-----------+\n\/\/ |something|   greeting|        cat|\n\/\/ +---------+-----------+-----------+\n\/\/ |    funny|hello world|kitten meow|\n\/\/ |   person|hello world|kitten meow|\n\/\/ +---------+-----------+-----------+<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/medium.com\/@mrpowers\/chaining-custom-dataframe-transformations-in-spark-a39e315f903c\">https:\/\/medium.com\/@mrpowers\/chaining-custom-dataframe-transformations-in-spark-a39e315f903c<\/a><\/p>\n<h2>Combine Two Arrays from Two DataFrames<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">\/\/ or\nval bothItemsRDD = userPredictedItemsDF.join(userActualItemsDF, Seq(\"user_id\", \"user_id\"))\n  .select(userPredictedItemsDF.col(\"items\"), userActualItemsDF.col(\"items\"))\n  .rdd\n  .map({\n    case Row(userPredictedItems: Seq[Int], userActualItems: Seq[Int]) =&gt; {\n      (userPredictedItems.slice(0, $(k)).toArray, userActualItems.slice(0, $(k)).toArray)\n    }\n  })<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/28166555\/how-to-convert-row-of-a-scala-dataframe-into-case-class-most-efficiently\">https:\/\/stackoverflow.com\/questions\/28166555\/how-to-convert-row-of-a-scala-dataframe-into-case-class-most-efficiently<\/a><\/p>\n<h2>Handle Missing Values (NaN and Null)<\/h2>\n<p><code>NaN<\/code> stands for &quot;Not a Number&quot;, it's usually the result of a mathematical operation that doesn't make sense, e.g. 5\/0.<\/p>\n<pre class=\"line-numbers\"><code class=\"language-scala\">val df1 = repoInfoDF.na.fill(\"\")\nval df2 = repoInfoDF.na.fill(\"\", Seq(\"description\", \"homepage\"))<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/33089781\/spark-dataframe-handing-empty-string-in-onehotencoder\">https:\/\/stackoverflow.com\/questions\/33089781\/spark-dataframe-handing-empty-string-in-onehotencoder<\/a><br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/43882699\/which-differences-there-are-between-null-and-nan-in-spark-how-to-deal-with\">https:\/\/stackoverflow.com\/questions\/43882699\/which-differences-there-are-between-null-and-nan-in-spark-how-to-deal-with<\/a><\/p>\n<h2>Calculate the Difference between Two Dates<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.sql.functions._\n\ndf.withColumn(\"updated_at_days_since_today\", datediff(current_date(), $\"updated_at\"))<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/databricks.com\/blog\/2015\/09\/16\/apache-spark-1-5-dataframe-api-highlights.html\">https:\/\/databricks.com\/blog\/2015\/09\/16\/apache-spark-1-5-dataframe-api-highlights.html<\/a><\/p>\n<h2>Create a User-Defined Function (UDF) which Accepts One Column<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.sql.functions.udf\n\nval df = Seq(\n  (1, \"Company Name Inc.\"),\n  (2, \"Company Name Co.\"),\n  (3, \"Company Name \")\n).toDF(\"id\", \"company\")\n\nval removeUninformativeWords = (text: String) =&gt; {\n  text\n    .toLowerCase()\n    .replaceAll(\", inc.\", \"\")\n    .replaceAll(\" inc.\", \"\")\n    .replaceAll(\"-inc\", \"\")\n    .replaceAll(\" co., ltd.\", \"\")\n    .replaceAll(\" co.\", \"\")\n    .replaceAll(\" ltd.\", \"\")\n    .replaceAll(\".com\", \"\")\n    .replaceAll(\"@\", \"\")\n    .trim()\n}\nval removeUninformativeWordsUDF = udf(removeUninformativeWords)\n\nval newDF = df.withColumn(\"fix_company\", removeUninformativeWordsUDF($\"company\"))\nnewDF.show()<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/jaceklaskowski.gitbooks.io\/mastering-apache-spark\/spark-sql-udfs.html\">https:\/\/jaceklaskowski.gitbooks.io\/mastering-apache-spark\/spark-sql-udfs.html<\/a><\/p>\n<h2>Create a User-Defined Function (UDF) which Accepts Multiple Columns<\/h2>\n<p>When defining your UDFs, you must use <code>Row<\/code> for mapping <code>StructType<\/code> instead of any custom case classes.<\/p>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.sql.functions.udf\nimport org.apache.spark.sql.Row\n\nuserRecommendationsDF.printSchema()\n\/\/ root\n \/\/ |-- user: integer (nullable = false)\n \/\/ |-- recommendations: array (nullable = true)\n \/\/ |    |-- element: struct (containsNull = true)\n \/\/ |    |    |-- item: integer (nullable = true)\n\/\/ |    |    |-- rating: float (nullable = true)\n\nval extractItemsUDF = udf((user: Int, recommendations: Seq[Row]) =&gt; {\n  val items: Seq[Int] = recommendations.map(_.getInt(0))\n  val ratings: Seq[Float] = recommendations.map(_.getFloat(1))\n  items\n})\nuserRecommendationsDF.withColumn(\"items\", extractItemsUDF($\"user\", $\"recommendations\"))\n\/\/ +----+-----------------------------------------------------------------+------------+\n\/\/ |user|recommendations                                                  |items       |\n\/\/ +----+-----------------------------------------------------------------+------------+\n\/\/ |1   |[[2,0.9997897], [4,0.9988761], [1,0.9988487], [5,0.0]]           |[2, 4, 1, 5]|\n\/\/ |3   |[[5,0.9984464], [1,2.9802322E-8], [2,0.0], [4,0.0]]              |[5, 1, 2, 4]|\n\/\/ |2   |[[4,0.9921428], [2,0.10759391], [1,0.10749264], [5,1.4901161E-8]]|[4, 2, 1, 5]|\n\/\/ +----+-----------------------------------------------------------------+------------+<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/32196207\/derive-multiple-columns-from-a-single-column-in-a-spark-dataframe\">https:\/\/stackoverflow.com\/questions\/32196207\/derive-multiple-columns-from-a-single-column-in-a-spark-dataframe<\/a><\/p>\n<p>The <code>GenericRowWithSchema cannot be cast to XXX<\/code> issue:<\/p>\n<p>There is a strict mapping between Spark SQL data types and Scala types, such as <code>IntegerType<\/code> vs. <code>Int<\/code>, <code>TimestampType<\/code> vs. <code>java.sql.Timestamp<\/code> and <code>StructType<\/code> vs. <code>org.apache.spark.sql.Row<\/code>.<\/p>\n<p>ref:<br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/38413040\/spark-sql-udf-with-complex-input-parameter\">https:\/\/stackoverflow.com\/questions\/38413040\/spark-sql-udf-with-complex-input-parameter<\/a><br \/>\n<a href=\"https:\/\/spark.apache.org\/docs\/latest\/sql-programming-guide.html#data-types\">https:\/\/spark.apache.org\/docs\/latest\/sql-programming-guide.html#data-types<\/a><\/p>\n<h2>Create a User-Defined Function (UDF) with Extra Parameters<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.sql.expressions.UserDefinedFunction\nimport org.apache.spark.sql.functions._\nimport scala.util.control.Breaks._\n\nval df = spark.createDataFrame(Seq(\n  (\"Bob\", 35, \"Web Backend Developer\"),\n  (\"Cathy\", 24, \"UX Designer\"),\n  (\"Vic\", 26, \"PM\"),\n  (\"Tom\", 29, \"Back end Engineer\")\n)).toDF(\"name\", \"age\", \"occupation\")\n\n\/\/ the return type of this UDF is Double\ndef containsAnyOfUDF(substrings: Array[String]): UserDefinedFunction = udf[Double, String]((text: String) =&gt; {\n  var result = 0.0\n  breakable {\n    for (substring &lt;- substrings) {\n      if (text.contains(substring)) {\n        result = 1.0\n        break\n      }\n    }\n  }\n  result\n})\n\nval backends = Array(\"backend\", \"back end\")\ndf.withColumn(\"knows_backend\", containsAnyOfUDF(backends)(lower($\"occupation\"))))\n\/\/ +-----+---+---------------------+-------------+\n\/\/ |name |age|occupation           |knows_backend|\n\/\/ +-----+---+---------------------+-------------+\n\/\/ |Bob  |35 |Web Backend Developer|1.0          |\n\/\/ |Cathy|24 |UX Designer          |0.0          |\n\/\/ |Vic  |26 |PM                   |0.0          |\n\/\/ |Tom  |29 |Back end Engineer    |1.0          |\n\/\/ +-----+---+---------------------+-------------+<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/35546576\/how-can-i-pass-extra-parameters-to-udfs-in-sparksql\">https:\/\/stackoverflow.com\/questions\/35546576\/how-can-i-pass-extra-parameters-to-udfs-in-sparksql<\/a><\/p>\n<h2>Create a User-Defined Function (UDF) which Accepts Columns<\/h2>\n<p>ref:<br \/>\n<a href=\"https:\/\/medium.com\/@mrpowers\/spark-user-defined-functions-udfs-6c849e39443b\">https:\/\/medium.com\/@mrpowers\/spark-user-defined-functions-udfs-6c849e39443b<\/a><\/p>\n<h2>Select Rows that Contain Certain String in a DataFrame<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">userInfoDF\n  .where(\"company LIKE '%Inc%'\")\n  .orderBy($\"company\".desc)<\/code><\/pre>\n<h2>Order By One of the Elements in a List Column<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.ml.linalg.{Vector, Vectors}\n\nval to_array = udf((v: Vector) =&gt; v.toDense.values)\n\nresultTestDF\n  .where(\"user_id = 652070\")\n  .orderBy(to_array($\"probability\").getItem(1).desc)\n  .select(\"user_id\", \"repo_id\", \"starring\", \"prediction\", \"probability\")\n  .show(false)<\/code><\/pre>\n<h2>Show Distinct Values<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">df.select($\"company\").distinct().orderBy($\"company\".desc)\n\nimport org.apache.spark.sql.functions.approx_count_distinct\n\nimputedUserInfoDF.select(approx_count_distinct($\"company\")).show()\n\/\/ 39581<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/databricks.com\/blog\/2016\/05\/19\/approximate-algorithms-in-apache-spark-hyperloglog-and-quantiles.html\">https:\/\/databricks.com\/blog\/2016\/05\/19\/approximate-algorithms-in-apache-spark-hyperloglog-and-quantiles.html<\/a><\/p>\n<h2>Group By then Aggregation<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.sql.functions.count\n\nval df = userInfoDF\n  .groupBy($\"company\")\n  .agg(count(\"user_id\").alias(\"count\"))\n  .orderBy($\"count\".desc)\n  .limit(1000)<\/code><\/pre>\n<h2>Group By then Get First\/Top n Items<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.sql.expressions.Window\nimport org.apache.spark.sql.functions.rank\n\nval topN = 10\nval window = Window.partitionBy($\"user_id\").orderBy($\"starred_at\".desc)\nval userLanguagesDF = repoInfoStarringDF\n  .withColumn(\"rank\", rank.over(window))\n  .where($\"rank\" &lt;= topN)\n  .groupBy($\"user_id\")\n  .agg(collect_list($\"language\").alias(\"languages\"))\n\/\/ +--------+-----------------------------------------------------------------+\n\/\/ |user_id |languages                                                        |\n\/\/ +--------+-----------------------------------------------------------------+\n\/\/ |2142    |[JavaScript, Python, Python, Ruby, Go]                           |\n\/\/ |59990   |[Python, JavaScript, Go, JavaScript, Go]                         |\n\/\/ |101094  |[JavaScript, C++, Max, C, JavaScript]                            |\n\/\/ |109622  |[PHP, PHP, PHP, PHP, Python]                                     |\n\/\/ |201694  |[TypeScript, Python, Vim script, Vim script, Python]             |\n\/\/ +--------+-----------------------------------------------------------------+<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/33655467\/get-topn-of-all-groups-after-group-by-using-spark-dataframe\">https:\/\/stackoverflow.com\/questions\/33655467\/get-topn-of-all-groups-after-group-by-using-spark-dataframe<\/a><br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/35918262\/spark-topn-after-groupby\">https:\/\/stackoverflow.com\/questions\/35918262\/spark-topn-after-groupby<\/a><\/p>\n<p>or<\/p>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.sql.expressions.Window\nimport org.apache.spark.sql.functions.row_number\n\nval df = spark.createDataFrame(Seq(\n  (2, 1, 0, \"2017-05-18 22:01:00.0\"),\n  (1, 1, 1, \"2017-05-16 20:01:00.0\"),\n  (2, 5, 0, \"2017-05-21 22:01:00.0\"),\n  (1, 2, 1, \"2017-05-17 21:01:00.0\"),\n  (1, 4, 1, \"2017-05-17 22:01:00.0\"),\n  (1, 7, 1, \"2017-05-12 22:01:00.0\"),\n  (3, 1, 1, \"2017-05-19 22:01:00.0\"),\n  (3, 2, 1, \"2017-05-30 22:01:00.0\"),\n  (3, 5, 1, \"2017-05-01 22:01:00.0\"),\n  (1, 6, 1, \"2017-05-19 22:01:00.0\")\n)).toDF(\"user_id\", \"repo_id\", \"starring\", \"starred_at\")\n\nval windowSpec = Window.partitionBy($\"user_id\").orderBy($\"starred_at\".desc)\nval userActualItemsDF = df\n  .withColumn(\"row_number\", row_number().over(windowSpec))\n  .where($\"row_number\" &lt;= 3)\n\/\/ +-------+-------+--------+---------------------+----------+\n\/\/ |user_id|repo_id|starring|starred_at           |row_number|\n\/\/ +-------+-------+--------+---------------------+----------+\n\/\/ |1      |6      |1       |2017-05-19 22:01:00.0|1         |\n\/\/ |1      |4      |1       |2017-05-17 22:01:00.0|2         |\n\/\/ |1      |2      |1       |2017-05-17 21:01:00.0|3         |\n\/\/ |3      |2      |1       |2017-05-30 22:01:00.0|1         |\n\/\/ |3      |1      |1       |2017-05-19 22:01:00.0|2         |\n\/\/ |3      |5      |1       |2017-05-01 22:01:00.0|3         |\n\/\/ |2      |5      |0       |2017-05-21 22:01:00.0|1         |\n\/\/ |2      |1      |0       |2017-05-18 22:01:00.0|2         |\n\/\/ +-------+-------+--------+---------------------+----------+<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/33878370\/how-to-select-the-first-row-of-each-group\">https:\/\/stackoverflow.com\/questions\/33878370\/how-to-select-the-first-row-of-each-group<\/a><br \/>\n<a href=\"http:\/\/xinhstechblog.blogspot.tw\/2016\/04\/spark-window-functions-for-dataframes.html\">http:\/\/xinhstechblog.blogspot.tw\/2016\/04\/spark-window-functions-for-dataframes.html<\/a><\/p>\n<h2>Group By then Concatenate Strings<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.sql.expressions.Window\nimport org.apache.spark.sql.functions._\n\nval userTopicsDF = repoInfoStarringDF\n  .where($\"topics\" =!= \"\")\n  .withColumn(\"rank\", rank.over(Window.partitionBy($\"user_id\").orderBy($\"starred_at\".desc)))\n  .where($\"rank\" &lt;= 50)\n  .groupBy($\"user_id\")\n  .agg(concat_ws(\",\", collect_list($\"topics\")).alias(\"topics_preferences\"))\n\/\/ +--------+-------------------------------------------------------------------+\n\/\/ | user_id|                                                 topics_preferences|\n\/\/ +--------+-------------------------------------------------------------------+\n\/\/ |    2142|go,golang,grpc,microservice,protobuf,consul,go,golang,load-balancer|\n\/\/ |   59990|api,api-gateway,aws,aws-infrastructure,aws-lambda,deploy-tool      |\n\/\/ |  101094|ableton,javascript,maxmsp,c,c89,gui,imgui,nuklear                  |\n\/\/ |  109622|stripe,algolia,magento,magento-algoliasearch,php,search,aws        |\n\/\/ +--------+-------------------------------------------------------------------+<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/community.hortonworks.com\/questions\/44886\/dataframe-groupby-and-concat-non-empty-strings.html\">https:\/\/community.hortonworks.com\/questions\/44886\/dataframe-groupby-and-concat-non-empty-strings.html<\/a><\/p>\n<h2>Group By after Order By Maintains Order<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">val list = Seq(\n  (2, 1, 0, \"2017-05-18 22:01:00.0\"),\n  (1, 1, 1, \"2017-05-16 20:01:00.0\"),\n  (2, 5, 0, \"2017-05-21 22:01:00.0\"),\n  (1, 2, 1, \"2017-05-17 21:01:00.0\"),\n  (1, 4, 1, \"2017-05-17 22:01:00.0\"),\n  (3, 1, 1, \"2017-05-19 22:01:00.0\"),\n  (3, 2, 1, \"2017-05-30 22:01:00.0\"),\n  (3, 5, 1, \"2017-05-01 22:01:00.0\"),\n  (1, 6, 1, \"2017-05-19 22:01:00.0\")\n)\nval df = list.toDF(\"user_id\", \"repo_id\", \"starring\", \"starred_at\")\n\nval userActualItemsDF = df\n  .orderBy($\"starred_at\".desc)\n  .groupBy($\"user_id\")\n  .agg(collect_list($\"repo_id\").alias(\"items\"))\n\/\/ +-------+------------+\n\/\/ |user_id|       items|\n\/\/ +-------+------------+\n\/\/ |      1|[6, 4, 2, 1]|\n\/\/ |      3|   [2, 1, 5]|\n\/\/ |      2|      [5, 1]|\n\/\/ +-------+------------+<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/39505599\/spark-dataframe-does-groupby-after-orderby-maintain-that-order\">https:\/\/stackoverflow.com\/questions\/39505599\/spark-dataframe-does-groupby-after-orderby-maintain-that-order<\/a><\/p>\n<h2>collect_list Multiple Columns<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.sql.functions.{collect_list, struct}\n\npredictionDF\n\/\/ +----+----+------+----------+\n\/\/ |user|item|rating|prediction|\n\/\/ +----+----+------+----------+\n\/\/ |   1|   1|    12| 0.9988487|\n\/\/ |   3|   5|     8| 0.9984464|\n\/\/ |   1|   4|     4|0.99887615|\n\/\/ |   2|   4|     1| 0.9921428|\n\/\/ |   1|   2|    90| 0.9997897|\n\/\/ +----+----+------+----------+\n\nval userRecommendationsDF = predictionDF\n  .groupBy($\"user\")\n  .agg(collect_list(struct($\"item\", $\"prediction\")).alias(\"recommendations\"))\n\/\/ +----+----------------------------------------------+\n\/\/ |user|recommendations                               |\n\/\/ +----+----------------------------------------------+\n\/\/ |1   |[[1,0.9988487], [4,0.99887615], [2,0.9997897]]|\n\/\/ |3   |[[5,0.9984464]]                               |\n\/\/ |2   |[[4,0.9921428]]                               |\n\/\/ +----+----------------------------------------------+<\/code><\/pre>\n<h2>groupByKey() on Dataset<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">val keyValues = List(\n  (3, \"Me\"), (1, \"Thi\"), (2, \"Se\"), (3, \"ssa\"), (1, \"sIsA\"), (3, \"ge:\"), (3, \"-\"), (2, \"cre\"), (2, \"t\")\n)\nval keyValuesDS = keyValues.toDS\nval ds = keyValuesDS\n  .groupByKey(P =&gt; P._1)\n  .mapValues(p =&gt; p._2)\n  .reduceGroups((acc, str) =&gt; acc + str)<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/www.coursera.org\/learn\/scala-spark-big-data\/lecture\/yrfPh\/datasets\">https:\/\/www.coursera.org\/learn\/scala-spark-big-data\/lecture\/yrfPh\/datasets<\/a><\/p>\n<pre class=\"line-numbers\"><code class=\"language-scala\">case class TimeUsageRow(\n  working: String,\n  sex: String,\n  age: String,\n  primaryNeeds: Double,\n  work: Double,\n  other: Double\n)\n\nval summaryDs = doShit()\n\/\/ +-----------+------+------+------------------+------------------+------------------+\n\/\/ |    working|   sex|   age|      primaryNeeds|              work|             other|\n\/\/ +-----------+------+------+------------------+------------------+------------------+\n\/\/ |    working|  male| elder|             15.25|               0.0|              8.75|\n\/\/ |    working|female|active|13.833333333333334|               0.0|10.166666666666666|\n\/\/ |    working|female|active|11.916666666666666|               0.0|12.083333333333334|\n\/\/ |not working|female|active|13.083333333333334|               2.0| 8.916666666666666|\n\/\/ |    working|  male|active|11.783333333333333| 8.583333333333334|3.6333333333333333|\n\/\/ +-----------+------+------+------------------+------------------+------------------+\n\nval finalDs = summaryDs\n  .groupByKey((row: TimeUsageRow) =&gt; {\n    (row.working, row.sex, row.age)\n  })\n  .agg(\n    round(typed.avg[TimeUsageRow](_.primaryNeeds), 1).as(Encoders.DOUBLE),\n    round(typed.avg[TimeUsageRow](_.work), 1).as(Encoders.DOUBLE),\n    round(typed.avg[TimeUsageRow](_.other), 1).as(Encoders.DOUBLE)\n  )\n  .map({\n    case ((working, sex, age), primaryNeeds, work, other) =&gt; {\n      TimeUsageRow(working, sex, age, primaryNeeds, work, other)\n    }\n  })\n  .orderBy(\"working\", \"sex\", \"age\")\n  .as[TimeUsageRow]\n\/\/ +-----------+------+------+------------+----+-----+\n\/\/ |    working|   sex|   age|primaryNeeds|work|other|\n\/\/ +-----------+------+------+------------+----+-----+\n\/\/ |not working|female|active|        12.4| 0.5| 10.8|\n\/\/ |not working|female| elder|        10.9| 0.4| 12.4|\n\/\/ |not working|female| young|        12.5| 0.2| 11.1|\n\/\/ |not working|  male|active|        11.4| 0.9| 11.4|\n\/\/ |not working|  male| elder|        10.7| 0.7| 12.3|\n\/\/ +-----------+------+------+------------+----+-----+<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/39946210\/spark-2-0-datasets-groupbykey-and-divide-operation-and-type-safety\">https:\/\/stackoverflow.com\/questions\/39946210\/spark-2-0-datasets-groupbykey-and-divide-operation-and-type-safety<\/a><\/p>\n<h2>Aggregator<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">val myAgg = new Aggregator[IN, BUF, OUT] {\n  def zero: BUF = ??? \/\/ the initial value\n  def reduce(b: BUF, a: IN): BUF = ??? \/\/ add an element to the running total\n  def merge(b1: BUF, b2: BUF): BUF = ??? \/\/ merge itermediate values\n  def finish(b: BUF): OUT = ??? \/\/ return the final result\n  override def bufferEncoder: Encoder[BUF] = ???\n  override def outputEncoder: Encoder[OUT] = ???\n}.toColumn<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/www.coursera.org\/learn\/scala-spark-big-data\/lecture\/yrfPh\/datasets\">https:\/\/www.coursera.org\/learn\/scala-spark-big-data\/lecture\/yrfPh\/datasets<\/a><\/p>\n<h2>Join Two DataFrames<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">val starringDF = spark.createDataFrame(Seq(\n    (652070, 21289110, 1),\n    (652070, 4164482, 1),\n    (652070, 44838949, 0),\n    (1912583, 4164482, 0)\n)).toDF(\"user_id\", \"repo_id\", \"starring\")\n\nval repoDF = spark.createDataFrame(Seq(\n    (21289110, \"awesome-python\", \"Python\", 37336),\n    (4164482, \"django\", \"Python\", 27451),\n    (44838949, \"swift\", \"C++\", 39840)\n)).toDF(\"id\", \"repo_name\", \"repo_language\", \"stargazers_count\")\n\nval fullDF = starringDF.join(repoDF, starringDF.col(\"repo_id\") === repoDF.col(\"id\"))\n\/\/ +-------+--------+--------+--------+--------------+-------------+----------------+\n\/\/ |user_id| repo_id|starring|      id|     repo_name|repo_language|stargazers_count|\n\/\/ +-------+--------+--------+--------+--------------+-------------+----------------+\n\/\/ | 652070|21289110|       1|21289110|awesome-python|       Python|           37336|\n\/\/ | 652070| 4164482|       1| 4164482|        django|       Python|           27451|\n\/\/ | 652070|44838949|       0|44838949|         swift|          C++|           39840|\n\/\/ |1912583| 4164482|       0| 4164482|        django|       Python|           27451|\n\/\/ +-------+--------+--------+--------+--------------+-------------+----------------+<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/docs.databricks.com\/spark\/latest\/faq\/join-two-dataframes-duplicated-column.html\">https:\/\/docs.databricks.com\/spark\/latest\/faq\/join-two-dataframes-duplicated-column.html<\/a><\/p>\n<h2>Broadcast Join<\/h2>\n<p>ref:<br \/>\n<a href=\"https:\/\/docs.cloud.databricks.com\/docs\/latest\/databricks_guide\/04%20SQL%2C%20DataFrames%20%26%20Datasets\/05%20BroadcastHashJoin%20-%20scala.html\">https:\/\/docs.cloud.databricks.com\/docs\/latest\/databricks_guide\/04%20SQL%2C%20DataFrames%20%26%20Datasets\/05%20BroadcastHashJoin%20-%20scala.html<\/a><\/p>\n<h2>Union Multiple DataFrames<\/h2>\n<pre class=\"line-numbers\"><code class=\"language-scala\">val candidateDF = recommenders\n  .map((recommender: Recommender) =&gt; recommender.recommendForUsers(testUserDF))\n  .reduce(_ union _)\n  .select($\"user_id\", $\"repo_id\")<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/37612622\/spark-unionall-multiple-dataframes\">https:\/\/stackoverflow.com\/questions\/37612622\/spark-unionall-multiple-dataframes<\/a><\/p>\n","protected":false},"excerpt":{"rendered":"<p>Scala is the first class citizen language for interacting with Apache Spark, but it's difficult to learn. This article is mostly about operating DataFrame or Dataset in Spark SQL.<\/p>\n","protected":false},"author":1,"featured_media":430,"comment_status":"closed","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[97,112],"tags":[108,109],"class_list":["post-429","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-about-ai","category-about-big-data","tag-apache-spark","tag-scala"],"_links":{"self":[{"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/posts\/429","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/comments?post=429"}],"version-history":[{"count":0,"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/posts\/429\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/media\/430"}],"wp:attachment":[{"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/media?parent=429"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/categories?post=429"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/tags?post=429"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}