Spark SQL cookbook (Scala)

Spark SQL cookbook (Scala)

Scala is the first class citizen language for interacting with Apache Spark, but it's difficult to learn. This article is mostly about operating DataFrame or Dataset in Spark SQL.

Access SparkSession

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

implicit val spark: SparkSession = SparkSession
  .builder()
  .appName("PersonalizedRanker")
  .config(conf)
  .getOrCreate()

val sc = spark.sparkContext
sc.setLogLevel("WARN")

Before Spark 2.0:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

val conf: SparkConf = new SparkConf()
  .setAppName("PersonalizedRanker")
  .setMaster("local[*]")

val sc: SparkContext = new SparkContext(conf)

ref:
https://sparkour.urizone.net/recipes/understanding-sparksession/
https://spark.apache.org/docs/latest/configuration.html

Import Implicits

val spark = SparkSession.builder().getOrCreate()

import spark.implicits._

// you could also access SparkSession via any Dataset or DataFrame
import someDS.sparkSession.implicits._
import someDF.sparkSession.implicits._

val ratingDF = rawDF
  .selectExpr("from_user_id AS user", "repo_id AS item", "1 AS rating", "starred_at")
  .orderBy($"user", $"starred_at".desc)

Read Data from MySQL

import java.util.Properties

val dbUrl = "jdbc:mysql://mysql:3306/albedo?user=root&password=123&verifyServerCertificate=false&useSSL=false"
val prop = new Properties()
prop.put("driver", "com.mysql.jdbc.Driver")
val rawDF = spark.read.jdbc(dbUrl, "app_repostarring", prop)

ref:
https://docs.databricks.com/spark/latest/data-sources/sql-databases.html

Read Data from a Apache Parquet File

import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}

import spark.implicits._

case class RepoStarring(
  user_id: Int,
  repo_id: Int,
  starred_at: java.sql.Timestamp,
  starring: Double
)

val dataDir = "."
val today = "20170820"

val savePath = s"$dataDir/spark-data/$today/repoStarringDF.parquet"
val df: DataFrame = try {
  spark.read.parquet(savePath)
} catch {
  case e: AnalysisException => {
    if (e.getMessage().contains("Path does not exist")) {
      val df = spark.read.jdbc(dbUrl, "app_repostarring", props)
        .select("user_id", "repo_id", "starred_at")
        .withColumn("starring", lit(1.0))
      df.write.mode("overwrite").parquet(savePath)
      df
    } else {
      throw e
    }
  }
}
df.as[RepoStarring]

Read Data from a Apache Avro File

$ spark-submit --packages "com.databricks:spark-avro_2.11:3.2.0"
import com.databricks.spark.avro._
import org.apache.spark.sql.SparkSession

implicit val spark = SparkSession
  .builder()
  .appName("Word2VecTrainer")
  .getOrCreate()

val df = spark.read.avro("./githubarchive_repo_info.avro")
df.show()

ref:
https://github.com/databricks/spark-avro

Create a RDD from a list of Certain Case Class

import org.apache.spark.rdd.RDD

case class WikipediaArticle(title: String, text: String) {
  def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}

val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
  WikipediaArticle("a1", "abc Scala xyz"),
  WikipediaArticle("a2", "abc Python xyz"),
  WikipediaArticle("a3", "abc Scala xyz"),
  WikipediaArticle("a4", "abc Scala xyz")
))

Get a Subset of a RDD

val subsetArr = someRDD.take(1000)
var subsetRDD = sc.parallelize(subsetArr)

Create a DataFrame from a Seq

import org.apache.spark.sql.functions._

val df = spark.createDataFrame(Seq(
    (1, 1, 1, "2017-05-16 20:01:00.0"),
    (1, 2, 1, "2017-05-17 21:01:00.0"),
    (2, 1, 0, "2017-05-18 22:01:00.0"),
    (3, 1, 1, "2017-05-10 22:01:00.0")
  ))
  .toDF("user", "item", "star", "starred_at")
  .withColumn("starred_at", unix_timestamp(col("starred_at")).cast("timestamp"))
// you could override the schema
// val df = spark.createDataFrame(df.rdd, starringSchema)

case class Starring(user: Int, item: Int, star: Int, starred_at: java.sql.Timestamp)
val ds = df.as[Starring]

df.printSchema()
// root
// |-- user: integer (nullable = false)
// |-- item: integer (nullable = false)
// |-- star: integer (nullable = false)
// |-- starred_at: timestamp (nullable = false)

ref:
https://medium.com/@mrpowers/manually-creating-spark-dataframes-b14dae906393

Create a Dataset

import spark.implicits._

val ds1 = spark.read.json("people.json").as[Person]
val ds2 = df.toDS
val ds3 = rdd.toDS

ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/yrfPh/datasets

Statistics

repoInfoDS.count()
// 16000

repoInfoDS
  .describe("language", "size", "stargazers_count", "forks_count", "subscribers_count", "open_issues_count")
  .show()
// +-------+--------+-----------------+-----------------+----------------+-----------------+-----------------+
// |summary|language|             size| stargazers_count|     forks_count|subscribers_count|open_issues_count|
// +-------+--------+-----------------+-----------------+----------------+-----------------+-----------------+
// |  count|   16000|            16000|            16000|           16000|            16000|            16000|
// |   mean|    null|    14777.8928125|      238.5754375|       61.164375|        20.802375|         9.896625|
// | stddev|    null|133926.8365289536|1206.220336641683|394.950236056925|84.09955924587845|56.72585435688847|
// |    min|   ANTLR|                0|                2|               0|                0|                0|
// |    max|    XSLT|          9427027|            56966|           28338|             4430|             3503|
// +-------+--------+-----------------+-----------------+----------------+-----------------+-----------------+

repoInfoDS.stat.corr("stargazers_count", "forks_count")
// 0.8408082958003276

repoInfoDS.stat.corr("stargazers_count", "size")
// 0.05351197356230549

repoInfoDS.stat.freqItems(Seq("language"), 0.1).show(truncate=false)
// +-----------------------------------------------------------+
// |language_freqItems                                         |
// +-----------------------------------------------------------+
// |[Ruby, Objective-C, C, Go, JavaScript, Swift, Java, Python]|
// +-----------------------------------------------------------+

ref:
https://databricks.com/blog/2015/06/02/statistical-and-mathematical-functions-with-dataframes-in-spark.html

Generate a Schema from a List of Column Names

import org.apache.spark.sql.types._

def generateSchema(columnNames: List[String]): StructType = {
  val fields = columnNames.map({
    case columnName: String if columnName == "tucaseid" => {
      StructField(columnName, StringType, nullable = false)
    }
    case columnName => {
      StructField(columnName, DoubleType, nullable = false)
    }
  })
  StructType(fields.toList)
}

val columnNames = List("tucaseid", "gemetsta", "gtmetsta", "peeduca", "pehspnon", "ptdtrace", "teage")
val schema = generateSchema(columnNames)

Generate a Schema from a Case Class

import java.sql.Timestamp
import org.apache.spark.sql.Encoders

case class Starring(user: Int, item: Int, star: Int, starred_at: Timestamp)
val starringSchema = Encoders.product[Starring].schema

ref:
https://stackoverflow.com/questions/36746055/generate-a-spark-structtype-schema-from-a-case-class

Change a DataFrame's Schema

val newDF = spark.createDataFrame(oldDF.rdd, starringSchema)

Print the Vector Length of Specific Columns

import org.apache.spark.ml.linalg.Vector 

userProfileDF.select("languages_preferences_w2v", "clean_bio_w2v")
  .head
  .toSeq.foreach((field: Any) => {
    println(field.asInstanceOf[Vector].size)
  })

Convert a DataFrame into a Dataset

import java.sql.Timestamp
import org.apache.spark.ml.feature.SQLTransformer

case class Starring(user: Int, item: Int, star: Int, starred_at: Timestamp)

val starringBuilder = new SQLTransformer()
val starringSQL = """
SELECT from_user_id AS user, repo_id AS item, 1 AS star, starred_at
FROM __THIS__
ORDER BY user, starred_at DESC
"""
starringBuilder.setStatement(starringSQL)
val starringDS = starringBuilder.transform(rawDF).as[Starring]

Convert a Dataset into a PairRDD

val pairRDD = dataset
  .select("user", "item")
  .rdd
  .map(row => (row(0), row(1)))

// or

case class UserItemPair(user: Int, item: Int)

val pairRDD = dataset
  .select("user", "item").as[UserItemPair]
  .rdd
  .map(row => (row.user, row.item))

// or

import org.apache.spark.sql.Row

val pairRDD = dataset
  .select("user", "item")
  .rdd
  .map({
    case Row(user: Int, item: Int) => (user, item)
  })

ref:
https://stackoverflow.com/questions/30655914/map-rdd-to-pairrdd-in-scala

Convert a Dataset into a List or Set

val popularRepos = popularReposDS
  .select("item")
  .rdd
  .map(r => r(0).asInstanceOf[Int])
  .collect()
  .to[List]

// or

val popularRepos = popularReposDS
  .select($"repo_id".as[Int])
  .collect()
  .to[List]

// List(98837133, 97071697, 17439026, ...)

ref:
https://stackoverflow.com/questions/32000646/extract-column-values-of-dataframe-as-list-in-apache-spark

Add a Column to a DataFrame

import org.apache.spark.sql.functions.lit

val newDF = df.withColumn("starring", lit(1))

Repeat withColumn()

import org.apache.spark.sql.DataFrame

val booleanColumnNames = Array("fork", "has_issues", "has_projects", "has_downloads", "has_wiki", "has_pages")
val convertedRepoInfoDF = booleanColumnNames.foldLeft[DataFrame](cleanRepoInfoDF)(
  (accDF, columnName) => accDF.withColumn(s"clean_$columnName", col(columnName).cast("int"))
)

// or

val lowerableColumnNames = Array("description", "language", "topics")
val booleanColumnNames = Array("fork", "has_issues", "has_projects", "has_downloads", "has_wiki", "has_pages")
val convertedRepoInfoDF = (lowerableColumnNames ++ booleanColumnNames).foldLeft[DataFrame](cleanRepoInfoDF)((accDF, columnName) => {
  columnName match {
    case _ if lowerableColumnNames.contains(columnName) => 
      accDF.withColumn(s"clean_$columnName", lower(col(columnName)))
    case _ if booleanColumnNames.contains(columnName) => 
      accDF.withColumn(s"clean_$columnName", col(columnName).cast("int"))
  }
})

ref:
https://stackoverflow.com/questions/41400504/spark-scala-repeated-calls-to-withcolumn-using-the-same-function-on-multiple-c

Specify that a Row has any Nullable Column

val nullableColumnNames = Array("bio", "blog", "company", "email", "location", "name")

val df2 = df1.withColumn("has_null", when(nullableColumnNames.map(df1(_).isNull).reduce(_||_), true).otherwise(false))
// or
val df2 = df1.withColumn("has_null", when($"bio".isNull or
                                          $"blog".isNull or
                                          $"company".isNull or
                                          $"email".isNull or
                                          $"location".isNull or
                                          $"name".isNull, true).otherwise(false))

Combine Two Columns into an Array

import org.apache.spark.sql.functions._

fixRepoInfoDS
  .where("topics != ''")
  .withColumn("tags", struct("language", "topics"))
  .select("tags")
  .show(truncate=false)
// +----------------------------------------------------------------------+
// |tags                                                                  |
// +----------------------------------------------------------------------+
// |[Ruby,aasm,activerecord,mongoid,ruby,state-machine,transition]        |
// |[Ruby,captcha,rails,recaptcha,ruby,sinatra]                           |
// |[Python,commandline,gtd,python,sqlite,todo]                           |
// ...
// +----------------------------------------------------------------------+

Select Nested Column Directly

userRecommendationsDF
+----+-----------------------------------------------------------------+
|user|recommendations                                                  |
+----+-----------------------------------------------------------------+
|1   |[[2,0.9997897], [4,0.9988761], [1,0.9988487], [5,0.0]]           |
|3   |[[5,0.9984464], [1,2.9802322E-8], [2,0.0], [4,0.0]]              |
|2   |[[4,0.9921428], [2,0.10759391], [1,0.10749264], [5,1.4901161E-8]]|
+----+-----------------------------------------------------------------+

userRecommendationsDF.printSchema()
// root
// |-- user: integer (nullable = false)
// |-- recommendations: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- item: integer (nullable = true)
// |    |    |-- rating: float (nullable = true)

val userItemsDF = userRecommendationsDF.select($"user", $"recommendations.item")
// +----+------------+
// |user|        item|
// +----+------------+
// |   1|[2, 4, 1, 5]|
// |   3|[5, 1, 2, 4]|
// |   2|[4, 2, 1, 5]|
// +----+------------+

ref:
https://stackoverflow.com/questions/38413040/spark-sql-udf-with-complex-input-parameter

Flatten an Array Column

import org.apache.spark.sql.Row

val userRecommenderItemDF = alsModel.recommendForAllUsers(15)
  .flatMap((row: Row) => {
    val userID = row.getInt(0)
    val recommendations = row.getSeq[Row](1)
    recommendations.map {
      case Row(repoID: Int, score: Float) => {
        (userID, repoID, score, "als")
      }
    }
  })
  .toDF("user_id", "repo_id", "score", "source")

userRecommenderItemDF.show()
// +-------+-------+-------------+------+
// |user_id|repo_id|score        |source|
// +-------+-------+-------------+------+
// |2142   |48     |0.05021151   |als   |
// |2142   |118    |0.05021151   |als   |
// |2142   |68     |0.7791124    |als   |
// |2142   |98     |0.9939307    |als   |
// |2142   |28     |0.53719014   |als   |
// +-------+-------+-------------+------+

Define Conditional Columns

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

val workingStatusProjection: Column = when($"telfs" >= 1 && $"telfs" < 3, "working").
                                      otherwise("not working").
                                      as("working")

val ageProjection: Column = when($"teage" >= 15 && $"teage" <= 22 , "young").
                            when($"teage" >= 23 && $"teage" <= 55 , "active").
                            otherwise("elder").
                            as("age")

val workProjection: Column = workColumns.reduce(_+_).divide(60).as("work_hours")

val new DF = df.select(workingStatusProjection, ageProjection, workProjection)
// +-----------+------+------------------+
// |    working|   age|        work_hours|
// +-----------+------+------------------+
// |    working| elder|               0.0|
// |    working|active|               0.0|
// |    working|active|               0.0|
// |not working|active|               2.0|
// |    working|active| 8.583333333333334|
// +-----------+------+------------------+

Create a Custom Transformation Function for any DataFrame

def withGreeting(df: DataFrame): DataFrame = {
  df.withColumn("greeting", lit("hello world"))
}

def withCat(name: String)(df: DataFrame): DataFrame = {
  df.withColumn("cat", lit(s"$name meow"))
}

val df = Seq(
  "funny",
  "person"
).toDF("something")

val weirdDf = df
  .transform(withGreeting)
  .transform(withCat("kitten"))
// +---------+-----------+-----------+
// |something|   greeting|        cat|
// +---------+-----------+-----------+
// |    funny|hello world|kitten meow|
// |   person|hello world|kitten meow|
// +---------+-----------+-----------+

ref:
https://medium.com/@mrpowers/chaining-custom-dataframe-transformations-in-spark-a39e315f903c

Combine Two Arrays from Two DataFrames

// or
val bothItemsRDD = userPredictedItemsDF.join(userActualItemsDF, Seq("user_id", "user_id"))
  .select(userPredictedItemsDF.col("items"), userActualItemsDF.col("items"))
  .rdd
  .map({
    case Row(userPredictedItems: Seq[Int], userActualItems: Seq[Int]) => {
      (userPredictedItems.slice(0, $(k)).toArray, userActualItems.slice(0, $(k)).toArray)
    }
  })

ref:
https://stackoverflow.com/questions/28166555/how-to-convert-row-of-a-scala-dataframe-into-case-class-most-efficiently

Handle Missing Values (NaN and Null)

NaN stands for "Not a Number", it's usually the result of a mathematical operation that doesn't make sense, e.g. 5/0.

val df1 = repoInfoDF.na.fill("")
val df2 = repoInfoDF.na.fill("", Seq("description", "homepage"))

ref:
https://stackoverflow.com/questions/33089781/spark-dataframe-handing-empty-string-in-onehotencoder
https://stackoverflow.com/questions/43882699/which-differences-there-are-between-null-and-nan-in-spark-how-to-deal-with

Calculate the Difference between Two Dates

import org.apache.spark.sql.functions._

df.withColumn("updated_at_days_since_today", datediff(current_date(), $"updated_at"))

ref:
https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html

Create an User-Defined Function (UDF) which Accepts One Column

import org.apache.spark.sql.functions.udf

val df = Seq(
  (1, "Company Name Inc."),
  (2, "Company Name Co."), 
  (3, "Company Name ")
).toDF("id", "company")

val removeUninformativeWords = (text: String) => {
  text
    .toLowerCase()
    .replaceAll(", inc.", "")
    .replaceAll(" inc.", "")
    .replaceAll("-inc", "")
    .replaceAll(" co., ltd.", "")
    .replaceAll(" co.", "")
    .replaceAll(" ltd.", "")
    .replaceAll(".com", "")
    .replaceAll("@", "")
    .trim()
}
val removeUninformativeWordsUDF = udf(removeUninformativeWords)

val newDF = df.withColumn("fix_company", removeUninformativeWordsUDF($"company"))
newDF.show()

ref:
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-udfs.html

Create an User-Defined Function (UDF) which Accepts Multiple Columns

When defining your UDFs, you must use Row for mapping StructType instead of any custom case classes.

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row

userRecommendationsDF.printSchema()
// root
 // |-- user: integer (nullable = false)
 // |-- recommendations: array (nullable = true)
 // |    |-- element: struct (containsNull = true)
 // |    |    |-- item: integer (nullable = true)
// |    |    |-- rating: float (nullable = true)

val extractItemsUDF = udf((user: Int, recommendations: Seq[Row]) => {
  val items: Seq[Int] = recommendations.map(_.getInt(0))
  val ratings: Seq[Float] = recommendations.map(_.getFloat(1))
  items
})
userRecommendationsDF.withColumn("items", extractItemsUDF($"user", $"recommendations"))
// +----+-----------------------------------------------------------------+------------+
// |user|recommendations                                                  |items       |
// +----+-----------------------------------------------------------------+------------+
// |1   |[[2,0.9997897], [4,0.9988761], [1,0.9988487], [5,0.0]]           |[2, 4, 1, 5]|
// |3   |[[5,0.9984464], [1,2.9802322E-8], [2,0.0], [4,0.0]]              |[5, 1, 2, 4]|
// |2   |[[4,0.9921428], [2,0.10759391], [1,0.10749264], [5,1.4901161E-8]]|[4, 2, 1, 5]|
// +----+-----------------------------------------------------------------+------------+

ref:
https://stackoverflow.com/questions/32196207/derive-multiple-columns-from-a-single-column-in-a-spark-dataframe

The GenericRowWithSchema cannot be cast to XXX issue:

There is a strict mapping between Spark SQL data types and Scala types, such as IntegerType vs. Int, TimestampType vs. java.sql.Timestamp and StructType vs. org.apache.spark.sql.Row.

ref:
https://stackoverflow.com/questions/38413040/spark-sql-udf-with-complex-input-parameter
https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types

Create an User-Defined Function (UDF) with Extra Parameters

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import scala.util.control.Breaks._

val df = spark.createDataFrame(Seq(
  ("Bob", 35, "Web Backend Developer"),
  ("Cathy", 24, "UX Designer"),
  ("Vic", 26, "PM"),
  ("Tom", 29, "Back end Engineer")
)).toDF("name", "age", "occupation")

// the return type of this UDF is Double
def containsAnyOfUDF(substrings: Array[String]): UserDefinedFunction = udf[Double, String]((text: String) => {
  var result = 0.0
  breakable {
    for (substring <- substrings) {
      if (text.contains(substring)) {
        result = 1.0
        break
      }
    }
  }
  result
})

val backends = Array("backend", "back end")
df.withColumn("knows_backend", containsAnyOfUDF(backends)(lower($"occupation"))))
// +-----+---+---------------------+-------------+
// |name |age|occupation           |knows_backend|
// +-----+---+---------------------+-------------+
// |Bob  |35 |Web Backend Developer|1.0          |
// |Cathy|24 |UX Designer          |0.0          |
// |Vic  |26 |PM                   |0.0          |
// |Tom  |29 |Back end Engineer    |1.0          |
// +-----+---+---------------------+-------------+

ref:
https://stackoverflow.com/questions/35546576/how-can-i-pass-extra-parameters-to-udfs-in-sparksql

Select Rows that Contain Certain String in a DataFrame

userInfoDF
  .where("company LIKE '%Inc%'")
  .orderBy($"company".desc)

Order By One of the Elements in a List Column

import org.apache.spark.ml.linalg.{Vector, Vectors}

val to_array = udf((v: Vector) => v.toDense.values)

resultTestDF
  .where("user_id = 652070")
  .orderBy(to_array($"probability").getItem(1).desc)
  .select("user_id", "repo_id", "starring", "prediction", "probability")
  .show(false)

Show Distinct Values

df.select($"company").distinct().orderBy($"company".desc)

import org.apache.spark.sql.functions.approx_count_distinct

imputedUserInfoDF.select(approx_count_distinct($"company")).show()
// 39581

ref:
https://databricks.com/blog/2016/05/19/approximate-algorithms-in-apache-spark-hyperloglog-and-quantiles.html

Group By then Aggregation

import org.apache.spark.sql.functions.count

val df = userInfoDF
  .groupBy($"company")
  .agg(count("user_id").alias("count"))
  .orderBy($"count".desc)
  .limit(1000)

Group By then Get First/Top n Items

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rank

val topN = 10
val window = Window.partitionBy($"user_id").orderBy($"starred_at".desc)
val userLanguagesDF = repoInfoStarringDF
  .withColumn("rank", rank.over(window))
  .where($"rank" <= topN)
  .groupBy($"user_id")
  .agg(collect_list($"language").alias("languages"))
// +--------+-----------------------------------------------------------------+
// |user_id |languages                                                        |
// +--------+-----------------------------------------------------------------+
// |2142    |[JavaScript, Python, Python, Ruby, Go]                           |
// |59990   |[Python, JavaScript, Go, JavaScript, Go]                         |
// |101094  |[JavaScript, C++, Max, C, JavaScript]                            |
// |109622  |[PHP, PHP, PHP, PHP, Python]                                     |
// |201694  |[TypeScript, Python, Vim script, Vim script, Python]             |
// +--------+-----------------------------------------------------------------+

ref:
https://stackoverflow.com/questions/33655467/get-topn-of-all-groups-after-group-by-using-spark-dataframe
https://stackoverflow.com/questions/35918262/spark-topn-after-groupby

or

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number

val df = spark.createDataFrame(Seq(
  (2, 1, 0, "2017-05-18 22:01:00.0"),
  (1, 1, 1, "2017-05-16 20:01:00.0"),
  (2, 5, 0, "2017-05-21 22:01:00.0"),
  (1, 2, 1, "2017-05-17 21:01:00.0"),
  (1, 4, 1, "2017-05-17 22:01:00.0"),
  (1, 7, 1, "2017-05-12 22:01:00.0"),
  (3, 1, 1, "2017-05-19 22:01:00.0"),
  (3, 2, 1, "2017-05-30 22:01:00.0"),
  (3, 5, 1, "2017-05-01 22:01:00.0"),
  (1, 6, 1, "2017-05-19 22:01:00.0")
)).toDF("user_id", "repo_id", "starring", "starred_at")

val windowSpec = Window.partitionBy($"user_id").orderBy($"starred_at".desc)
val userActualItemsDF = df
  .withColumn("row_number", row_number().over(windowSpec))
  .where($"row_number" <= 3)
// +-------+-------+--------+---------------------+----------+
// |user_id|repo_id|starring|starred_at           |row_number|
// +-------+-------+--------+---------------------+----------+
// |1      |6      |1       |2017-05-19 22:01:00.0|1         |
// |1      |4      |1       |2017-05-17 22:01:00.0|2         |
// |1      |2      |1       |2017-05-17 21:01:00.0|3         |
// |3      |2      |1       |2017-05-30 22:01:00.0|1         |
// |3      |1      |1       |2017-05-19 22:01:00.0|2         |
// |3      |5      |1       |2017-05-01 22:01:00.0|3         |
// |2      |5      |0       |2017-05-21 22:01:00.0|1         |
// |2      |1      |0       |2017-05-18 22:01:00.0|2         |
// +-------+-------+--------+---------------------+----------+

ref:
https://stackoverflow.com/questions/33878370/how-to-select-the-first-row-of-each-group
http://xinhstechblog.blogspot.tw/2016/04/spark-window-functions-for-dataframes.html

Group By then Concatenate Strings

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val userTopicsDF = repoInfoStarringDF
  .where($"topics" =!= "")
  .withColumn("rank", rank.over(Window.partitionBy($"user_id").orderBy($"starred_at".desc)))
  .where($"rank" <= 50)
  .groupBy($"user_id")
  .agg(concat_ws(",", collect_list($"topics")).alias("topics_preferences"))
// +--------+-------------------------------------------------------------------+
// | user_id|                                                 topics_preferences|
// +--------+-------------------------------------------------------------------+
// |    2142|go,golang,grpc,microservice,protobuf,consul,go,golang,load-balancer|
// |   59990|api,api-gateway,aws,aws-infrastructure,aws-lambda,deploy-tool      |
// |  101094|ableton,javascript,maxmsp,c,c89,gui,imgui,nuklear                  |
// |  109622|stripe,algolia,magento,magento-algoliasearch,php,search,aws        |
// +--------+-------------------------------------------------------------------+

ref:
https://community.hortonworks.com/questions/44886/dataframe-groupby-and-concat-non-empty-strings.html

Group By after Order By Maintains Order

val list = Seq(
  (2, 1, 0, "2017-05-18 22:01:00.0"),
  (1, 1, 1, "2017-05-16 20:01:00.0"),
  (2, 5, 0, "2017-05-21 22:01:00.0"),
  (1, 2, 1, "2017-05-17 21:01:00.0"),
  (1, 4, 1, "2017-05-17 22:01:00.0"),
  (3, 1, 1, "2017-05-19 22:01:00.0"),
  (3, 2, 1, "2017-05-30 22:01:00.0"),
  (3, 5, 1, "2017-05-01 22:01:00.0"),
  (1, 6, 1, "2017-05-19 22:01:00.0")
)
val df = list.toDF("user_id", "repo_id", "starring", "starred_at")

val userActualItemsDF = df
  .orderBy($"starred_at".desc)
  .groupBy($"user_id")
  .agg(collect_list($"repo_id").alias("items"))
// +-------+------------+
// |user_id|       items|
// +-------+------------+
// |      1|[6, 4, 2, 1]|
// |      3|   [2, 1, 5]|
// |      2|      [5, 1]|
// +-------+------------+

ref:
https://stackoverflow.com/questions/39505599/spark-dataframe-does-groupby-after-orderby-maintain-that-order

collect_list Multiple Columns

import org.apache.spark.sql.functions.{collect_list, struct}

predictionDF
// +----+----+------+----------+
// |user|item|rating|prediction|
// +----+----+------+----------+
// |   1|   1|    12| 0.9988487|
// |   3|   5|     8| 0.9984464|
// |   1|   4|     4|0.99887615|
// |   2|   4|     1| 0.9921428|
// |   1|   2|    90| 0.9997897|
// +----+----+------+----------+

val userRecommendationsDF = predictionDF
  .groupBy($"user")
  .agg(collect_list(struct($"item", $"prediction")).alias("recommendations"))
// +----+----------------------------------------------+
// |user|recommendations                               |
// +----+----------------------------------------------+
// |1   |[[1,0.9988487], [4,0.99887615], [2,0.9997897]]|
// |3   |[[5,0.9984464]]                               |
// |2   |[[4,0.9921428]]                               |
// +----+----------------------------------------------+

groupByKey() on Dataset

val keyValues = List(
  (3, "Me"), (1, "Thi"), (2, "Se"), (3, "ssa"), (1, "sIsA"), (3, "ge:"), (3, "-"), (2, "cre"), (2, "t")
)
val keyValuesDS = keyValues.toDS
val ds = keyValuesDS
  .groupByKey(P => P._1)
  .mapValues(p => p._2)
  .reduceGroups((acc, str) => acc + str)

ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/yrfPh/datasets

case class TimeUsageRow(
  working: String,
  sex: String,
  age: String,
  primaryNeeds: Double,
  work: Double,
  other: Double
)

val summaryDs = doShit()
// +-----------+------+------+------------------+------------------+------------------+
// |    working|   sex|   age|      primaryNeeds|              work|             other|
// +-----------+------+------+------------------+------------------+------------------+
// |    working|  male| elder|             15.25|               0.0|              8.75|
// |    working|female|active|13.833333333333334|               0.0|10.166666666666666|
// |    working|female|active|11.916666666666666|               0.0|12.083333333333334|
// |not working|female|active|13.083333333333334|               2.0| 8.916666666666666|
// |    working|  male|active|11.783333333333333| 8.583333333333334|3.6333333333333333|
// +-----------+------+------+------------------+------------------+------------------+

val finalDs = summaryDs
  .groupByKey((row: TimeUsageRow) => {
    (row.working, row.sex, row.age)
  })
  .agg(
    round(typed.avg[TimeUsageRow](_.primaryNeeds), 1).as(Encoders.DOUBLE),
    round(typed.avg[TimeUsageRow](_.work), 1).as(Encoders.DOUBLE),
    round(typed.avg[TimeUsageRow](_.other), 1).as(Encoders.DOUBLE)
  )
  .map({
    case ((working, sex, age), primaryNeeds, work, other) => {
      TimeUsageRow(working, sex, age, primaryNeeds, work, other)
    }
  })
  .orderBy("working", "sex", "age")
  .as[TimeUsageRow]
// +-----------+------+------+------------+----+-----+
// |    working|   sex|   age|primaryNeeds|work|other|
// +-----------+------+------+------------+----+-----+
// |not working|female|active|        12.4| 0.5| 10.8|
// |not working|female| elder|        10.9| 0.4| 12.4|
// |not working|female| young|        12.5| 0.2| 11.1|
// |not working|  male|active|        11.4| 0.9| 11.4|
// |not working|  male| elder|        10.7| 0.7| 12.3|
// +-----------+------+------+------------+----+-----+

ref:
https://stackoverflow.com/questions/39946210/spark-2-0-datasets-groupbykey-and-divide-operation-and-type-safety

Aggregator

val myAgg = new Aggregator[IN, BUF, OUT] {
  def zero: BUF = ??? // the initial value
  def reduce(b: BUF, a: IN): BUF = ??? // add an element to the running total
  def merge(b1: BUF, b2: BUF): BUF = ??? // merge itermediate values
  def finish(b: BUF): OUT = ??? // return the final result
  override def bufferEncoder: Encoder[BUF] = ???
  override def outputEncoder: Encoder[OUT] = ???
}.toColumn

ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/yrfPh/datasets

Join Two DataFrames

val starringDF = spark.createDataFrame(Seq(
    (652070, 21289110, 1),
    (652070, 4164482, 1),
    (652070, 44838949, 0),
    (1912583, 4164482, 0)
)).toDF("user_id", "repo_id", "starring")

val repoDF = spark.createDataFrame(Seq(
    (21289110, "awesome-python", "Python", 37336),
    (4164482, "django", "Python", 27451),
    (44838949, "swift", "C++", 39840)
)).toDF("id", "repo_name", "repo_language", "stargazers_count")

val fullDF = starringDF.join(repoDF, starringDF.col("repo_id") === repoDF.col("id"))
// +-------+--------+--------+--------+--------------+-------------+----------------+
// |user_id| repo_id|starring|      id|     repo_name|repo_language|stargazers_count|
// +-------+--------+--------+--------+--------------+-------------+----------------+
// | 652070|21289110|       1|21289110|awesome-python|       Python|           37336|
// | 652070| 4164482|       1| 4164482|        django|       Python|           27451|
// | 652070|44838949|       0|44838949|         swift|          C++|           39840|
// |1912583| 4164482|       0| 4164482|        django|       Python|           27451|
// +-------+--------+--------+--------+--------------+-------------+----------------+

ref:
https://docs.databricks.com/spark/latest/faq/join-two-dataframes-duplicated-column.html

Broadcast Join

ref:
https://docs.cloud.databricks.com/docs/latest/databricks_guide/04%20SQL%2C%20DataFrames%20%26%20Datasets/05%20BroadcastHashJoin%20-%20scala.html

Setup Spark, Scala and Maven with Intellij IDEA

Setup Spark, Scala and Maven with Intellij IDEA

IntelliJ IDEA supports Scala and Apache Spark perfectly. You're able to browse a complete Spark project built with IntelliJ IDEA on GitHub: https://github.com/vinta/albedo

Useful Plugins:

Initiate a Maven Project

$ mvn archetype:generate
Choose a number: xxx
xxx: remote -> net.alchim31.maven:scala-archetype-simple

ref:
https://docs.scala-lang.org/tutorials/scala-with-maven.html

Example Configurations

The remaining section of this article assumes that you use this pom.xml which should be able to work out of the box.

<!-- in pom.xml -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>ws.vinta</groupId>
  <artifactId>albedo</artifactId>
  <version>1.0.0-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>${project.artifactId}</name>
  <description>A recommender system for discovering GitHub repos</description>
  <url>https://github.com/vinta/albedo</url>
  <inceptionYear>2017</inceptionYear>
  <properties>
    <java.version>1.8</java.version>
    <scala.version>2.11.8</scala.version>
    <scala.compactVersion>2.11</scala.compactVersion>
    <spark.version>2.2.0</spark.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  <repositories>
    <repository>
      <id>spark-packages</id>
      <name>Spark Packages Repository</name>
      <url>https://dl.bintray.com/spark-packages/maven/</url>
    </repository>
  </repositories>
  <dependencies>
    <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_${scala.compactVersion}</artifactId>
      <version>${spark.version}</version>
      <scope>compile</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.11 -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_${scala.compactVersion}</artifactId>
      <version>${spark.version}</version>
      <scope>compile</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib_2.11 -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-mllib_${scala.compactVersion}</artifactId>
      <version>${spark.version}</version>
      <scope>compile</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.42</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpclient</artifactId>
      <version>4.5.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-high-level-client -->
    <dependency>
      <groupId>org.elasticsearch.client</groupId>
      <artifactId>elasticsearch-rest-high-level-client</artifactId>
      <version>5.6.2</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.hankcs/hanlp -->
    <dependency>
      <groupId>com.hankcs</groupId>
      <artifactId>hanlp</artifactId>
      <version>portable-1.3.4</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.github.rholder/snowball-stemmer -->
    <dependency>
      <groupId>com.github.rholder</groupId>
      <artifactId>snowball-stemmer</artifactId>
      <version>1.3.0.581.1</version>
    </dependency>
  </dependencies>
  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.6.1</version>
        <configuration>
          <source>${java.version}</source>
          <target>${java.version}</target>
          <encoding>UTF-8</encoding>
        </configuration>
      </plugin>
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.1</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-install-plugin</artifactId>
        <version>2.5.2</version>
        <configuration>
          <skip>true</skip>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.0</version>
        <executions>
          <execution>
           <phase>install</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                    <exclude>META-INF/*.SF</exclude>
                  </excludes>
                </filter>
              </filters>
              <artifactSet>
                <excludes>
                  <exclude>com.apple:AppleJavaExtensions:*</exclude>
                  <exclude>javax.servlet:*</exclude>
                  <exclude>org.apache.hadoop:*</exclude>
                  <exclude>org.apache.maven.plugins:*</exclude>
                  <exclude>org.apache.parquet:*</exclude>
                  <exclude>org.apache.spark:*</exclude>
                  <exclude>org.scala-lang:*</exclude>
                </excludes>
              </artifactSet>
              <finalName>${project.artifactId}-${project.version}-uber</finalName>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

ref:
https://davidb.github.io/scala-maven-plugin/example_compile.html

Generate a Thin JAR

Thin JAR only contains classes that you created, which means you should include your dependencies externally.

$ mvn clean package -DskipTests

You're able to specify different classes in the same JAR.

$ spark-submit \
--master spark://localhost:7077 \
--packages "mysql:mysql-connector-java:5.1.41" \
--class ws.vinta.albedo.LogisticRegressionRanker \
target/albedo-1.0.0-SNAPSHOT.jar

ref:
https://stackoverflow.com/questions/1082580/how-to-build-jars-from-intellij-properly
https://spark.apache.org/docs/latest/submitting-applications.html

Generate a Fat JAR, Shaded JAR or Uber JAR

CAUTION: DO NOT ENABLE <minimizeJar>true</minimizeJar> in the maven-shade-plugin, it will ruin your day!

<!-- in pom.xml -->
<project>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-install-plugin</artifactId>
        <version>2.5.2</version>
        <configuration>
          <skip>true</skip>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.1.0</version>
        <executions>
          <execution>
            <phase>install</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <artifactSet>
                <excludes>
                  <exclude>com.apple:AppleJavaExtensions:*</exclude>
                  <exclude>javax.servlet:*</exclude>
                  <exclude>org.apache.hadoop:*</exclude>
                  <exclude>org.apache.maven.plugins:*</exclude>
                  <exclude>org.apache.parquet:*</exclude>
                  <exclude>org.apache.spark:*</exclude>
                  <exclude>org.scala-lang:*</exclude>
                </excludes>
              </artifactSet>
              <finalName>${project.artifactId}-${project.version}-uber</finalName>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>
# the output jar will be located in "target/albedo-1.0.0-SNAPSHOT-uber.jar"
$ mvn clean install -DskipTests
$ spark-submit \
--master spark://localhost:7077 \
--class ws.vinta.albedo.LogisticRegressionRanker \
target/albedo-1.0.0-SNAPSHOT-uber.jar

ref:
http://maven.apache.org/plugins/maven-shade-plugin/examples/includes-excludes.html
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/missing_dependencies_in_jar_files.html

Run a Spark Application in Local Mode

Follow "Run > Edit Configurations":

  • VM options: -Xms12g -Xmx12g -Dspark.master="local[*]"
  • Before launch:
    • Build

Or

// in YourSparkApp.scala
package ws.vinta.albedo

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object YourSparkApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .set("spark.driver.memory", "12g")

    val spark = SparkSession
      .builder()
      .appName("YourSparkApp")
      .config(conf)
      .getOrCreate()

    spark.stop()
  }
}

ref:
https://github.com/jaceklaskowski/mastering-apache-spark-book/blob/master/spark-sparkcontext-creating-instance-internals.adoc
https://stackoverflow.com/questions/43054268/how-to-set-spark-memorystore-size-when-running-in-intellij-scala-console

Run a Spark Application in Standalone Mode

First, start your Spark Standalone cluster:

$ cd ${SPARK_HOME}
$ ./sbin/start-master.sh -h 0.0.0.0
$ ./sbin/start-slave.sh spark://localhost:7077

# print logs from Spark master and workers, useful for debuging
$ tail -f ${SPARK_HOME}/logs/*

Follow "Run > Edit Configurations":

  • VM options: -Dspark.master=spark://localhost:7077 -Dspark.driver.memory=2g -Dspark.executor.memory=12g -Dspark.executor.cores=3
    • Local cluster mode: -Dspark.master="local-cluster[x, y, z]" with x workers, y cores per worker, and z MB memory per worker
    • Local cluster mode doesn't need a real Spark Standalone cluster
  • Before launch:
    • Build
    • Run Maven Goal 'albedo: clean install'

Or

// in YourSparkApp.scala
package ws.vinta.albedo

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object YourSparkApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("spark://localhost:7077")
      .set("spark.driver.memory", "2g")
      .set("spark.executor.memory", "12g")
      .set("spark.executor.cores", "3")
      .setJars(List("target/albedo-1.0.0-SNAPSHOT-uber.jar"))
      // or
      .setMaster("local-cluster[1, 3, 12288]")
      .setJars(List("target/albedo-1.0.0-SNAPSHOT-uber.jar"))

    val spark = SparkSession
      .builder()
      .appName("YourSparkApp")
      .config(conf)
      .getOrCreate()

    spark.stop()
  }
}

In the end, there are some glossaries which need to be clarified:

  • compile: compile a single *.java or *.scala into *.class
  • make: compile changed files only
  • build: compile every files in the project

ref:
http://www.jianshu.com/p/b4e4658c459c

Specify a Custom Logging Configuration

$ cd PROJECT_ROOT
$ cp $SPARK_HOME/conf/log4j.properties.template log4j.properties

Follow "Run > Edit Configurations":

  • VM options: -Dlog4j.configuration=file:./log4j.properties

ref:
https://spark.apache.org/docs/latest/sql-programming-guide.html#getting-started
https://stackoverflow.com/questions/43054268/how-to-set-spark-memorystore-size-when-running-in-intellij-scala-console
https://community.cloudera.com/t5/Advanced-Analytics-Apache-Spark/Config-log4j-in-Spark/td-p/34968

Spark troubleshooting

Spark troubleshooting

Apache Spark 2.x Troubleshooting Guide
https://www.slideshare.net/jcmia1/a-beginners-guide-on-troubleshooting-spark-applications
https://www.slideshare.net/jcmia1/apache-spark-20-tuning-guide

Check your cluster UI to ensure that workers are registered and have sufficient resources

PYSPARK_DRIVER_PYTHON="jupyter" \
PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip 0.0.0.0" \
pyspark \
--packages "org.xerial:sqlite-jdbc:3.16.1,com.github.fommil.netlib:all:1.1.2" \
--driver-memory 4g \
--executor-memory 20g \
--master spark://TechnoCore.local:7077
TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

可能是你指定的 --executor-memory 超過了 worker 的 memory。

你可以在 Spark Master UI http://localhost:8080/ 看到各個 worker 總共有多少 memory 可以用。如果每台 worker 可以用的 memory 容量不同,Spark 就只會選擇那些 memory 大於 --executor-memory 的 workers。

ref:
https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application

SparkContext was shut down

ERROR Executor: Exception in task 1.0 in stage 6034.0 (TID 21592)
java.lang.StackOverflowError
...
ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(55,1494185401195,JobFailed(org.apache.spark.SparkException: Job 55 cancelled because SparkContext was shut down))

可能是 executor 的記憶體不夠,導致 Out Of Memory (OOM) 了。

ref:
http://stackoverflow.com/questions/32822948/sparkcontext-was-shut-down-while-running-spark-on-a-large-dataset

Container exited with a non-zero exit code 56 (or some other numbers)

WARN org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Container marked as failed: container_1504241464590_0001_01_000002 on host: albedo-w-1.c.albedo-157516.internal. Exit status: 56. Diagnostics: Exception from container-launch.
Container id: container_1504241464590_0001_01_000002
Exit code: 56
Stack trace: ExitCodeException exitCode=56:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:972)
    at org.apache.hadoop.util.Shell.run(Shell.java:869)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1170)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:236)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:305)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:84)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

Container exited with a non-zero exit code 56

可能是 executor 的記憶體不夠,導致 Out Of Memory (OOM) 了。

ref:
http://stackoverflow.com/questions/39038460/understanding-spark-container-failure

Exception in thread "main" java.lang.StackOverflowError

Exception in thread "main" java.lang.StackOverflowError
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
    at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    ...

解決辦法:

import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.SparkSession

val spark: SparkSession = SparkSession.builder().getOrCreate()
val sc = spark.sparkContext
sc.setCheckpointDir("./spark-data/checkpoint")

// 因為 sc.setCheckpointDir() 就會啟用 checkpoint 了
// 所以可以不用特別指定 checkpointInterval
val als = new ALS()
  .setCheckpointInterval(2)

ref:
https://stackoverflow.com/questions/31484460/spark-gives-a-stackoverflowerror-when-training-using-als
https://stackoverflow.com/questions/35127720/what-is-the-difference-between-spark-checkpoint-and-persist-to-a-disk

Randomness of hash of string should be disabled via PYTHONHASHSEED

解決辦法:

$ cd $SPARK_HOME
$ cp conf/spark-env.sh.template conf/spark-env.sh
$ echo "export PYTHONHASHSEED=42" >> conf/spark-env.sh

ref:
https://issues.apache.org/jira/browse/SPARK-13330

It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

因為 spark.sparkContext 只能在 driver program 裡存取,不能被 worker 存取(例如那些丟給 RDD 執行的 lambda function 或是 UDF 就是在 worker 上執行的)。

ref:
https://spark.apache.org/docs/latest/rdd-programming-guide.html#passing-functions-to-spark
https://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/

Spark automatically creates closures:

  • for functions that run on RDDs at workers,
  • and for any global variables that are used by those workers.

One closure is send per worker for every task. Closures are one way from the driver to the worker.

ref:
https://gerardnico.com/wiki/spark/closure

Unable to find encoder for type stored in a Dataset

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases. someDF.as[SomeCaseClass]

解決辦法:

import spark.implicits._

yourDF.as[YourCaseClass]

ref:
https://stackoverflow.com/questions/38664972/why-is-unable-to-find-encoder-for-type-stored-in-a-dataset-when-creating-a-dat

Task not serializable

Caused by: java.io.NotSerializableException: Settings
Serialization stack:
    - object not serializable (class: Settings, value: [email protected])
    - field (class: Settings$$anonfun$1, name: $outer, type: class Settings)
    - object (class Settings$$anonfun$1, <function1>)
Caused by: org.apache.spark.SparkException:
    Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)

通常是你在 closure functions 裡使用了 driver program 裡的某個 object,因為 Spark 會自動 serialize 那個被引用的 object 一起丟給 worker node 執行,所以如果那個 object 或是 class 沒辦法被 serialize,就會出現這個錯誤。

ref:
https://www.safaribooksonline.com/library/view/spark-the-definitive/9781491912201/ch04.html#user-defined-functions
http://www.puroguramingu.com/2016/02/26/spark-dos-donts.html
https://stackoverflow.com/questions/36176011/spark-sql-udf-task-not-serialisable
https://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html
https://mp.weixin.qq.com/s/BT6sXZlHcufAFLgTONCHsg

如果你只有在 Databricks Notebook 裡遇到這個錯誤,因為 Notebook 的運作機制跟一般的 Spark application 稍微有點不同,你可以試試 package cell。

ref:
https://docs.databricks.com/user-guide/notebooks/package-cells.html

java.lang.IllegalStateException: Cannot find any build directories.

java.lang.IllegalStateException: Cannot find any build directories.
    at org.apache.spark.launcher.CommandBuilderUtils.checkState(CommandBuilderUtils.java:248)
    at org.apache.spark.launcher.AbstractCommandBuilder.getScalaVersion(AbstractCommandBuilder.java:240)
    at org.apache.spark.launcher.AbstractCommandBuilder.buildClassPath(AbstractCommandBuilder.java:194)
    at org.apache.spark.launcher.AbstractCommandBuilder.buildJavaCommand(AbstractCommandBuilder.java:117)
    at org.apache.spark.launcher.WorkerCommandBuilder.buildCommand(WorkerCommandBuilder.scala:39)
    at org.apache.spark.launcher.WorkerCommandBuilder.buildCommand(WorkerCommandBuilder.scala:45)
    at org.apache.spark.deploy.worker.CommandUtils$.buildCommandSeq(CommandUtils.scala:63)
    at org.apache.spark.deploy.worker.CommandUtils$.buildProcessBuilder(CommandUtils.scala:51)
    at org.apache.spark.deploy.worker.ExecutorRunner.org$apache$spark$deploy$worker$ExecutorRunner$$fetchAndRunExecutor(ExecutorRunner.scala:145)
    at org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:73)

可能的原因是沒有設置 SPARK_HOME 或是你的 launch script 沒有讀到該環境變數。

Run interactive notebooks with Spark and Scala

Run interactive notebooks with Spark and Scala

Databricks

ref:
https://databricks.com
https://docs.databricks.com/user-guide/notebooks/index.html

Zeppelin

$ brew install apache-zeppelin
$ zeppelin-daemon.sh start
$ zeppelin-daemon.sh stop

$ cd /usr/local/Cellar/apache-zeppelin/0.7.2/libexec/
$ cp conf/zeppelin-env.sh.template conf/zeppelin-env.sh
# export SPARK_HOME="/usr/local/share/apache-spark/spark-2.1.0"
# export ZEPPELIN_NOTEBOOK_DIR="/Users/vinta/Projects/albedo/notebooks"

$ cd /usr/local/Cellar/apache-zeppelin/0.7.2/libexec/
$ tail -fn 500 zeppelin-interpreter-spark-vinta-Asurada.local.log
$ tail -fn 500 zeppelin-vinta-Asurada.local.log

# Zeppelin UI
$ open http://localhost:8080/

# Spark UI
$ open http://localhost:4040/jobs/

ref:
http://zeppelin.apache.org/docs/0.7.1/manual/interpreterinstallation.html
http://zeppelin.apache.org/docs/0.7.1/install/configuration.html
http://zeppelin.apache.org/docs/0.7.1/interpreter/spark.html

Jupyter

$ git clone https://github.com/apache/incubator-toree
$ cd incubator-toree/

# this might take a very long time
$ APACHE_SPARK_VERSION=2.1.0 make pip-release
$ pip install dist/toree-pip/toree-0.2.0.dev1.tar.gz

$ jupyter toree install --replace --spark_home=$SPARK_HOME --kernel_name="Spark"
$ jupyter notebook

ref:
http://blog.thedataincubator.com/2017/04/spark-2-0-on-jupyter-with-toree/

Setup Spark on macOS

Setup Spark on macOS

Install

First, you need Java 8 JDK.
http://www.oracle.com/technetwork/java/javase/downloads/index.html

$ java -version
java version "1.8.0_131"

Homebrew Version

$ brew update
$ brew install maven apache-spark

Pre-built Version

$ mkdir -p /usr/local/share/apache-spark && \
  cd /usr/local/share/apache-spark && \
  wget https://www.apache.org/dyn/closer.lua/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz && \
  tar -xvzf spark-2.2.0-bin-hadoop2.7.tgz

ref:
http://spark.apache.org/downloads.html

Build Version

This is the recommended way.

$ brew install [email protected]
$ export PATH="/usr/local/opt/[email protected]/bin:$PATH"
$ scala -version
Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL

$ mkdir -p /usr/local/share/apache-spark && \
  cd /usr/local/share/apache-spark && \
  wget https://d3kbcqa49mib13.cloudfront.net/spark-2.2.0.tgz && \
  tar -xvzf spark-2.2.0.tgz && \
  cd spark-2.2.0

$ ./build/mvn -Pnetlib-lgpl -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.3 -DskipTests -T 4C package
# or
$ ./build/mvn -Pnetlib-lgpl -Pyarn -Phadoop-2.7 -Dhadoop.version=2.7.3 -DskipTests clean package
$ spark-shell --packages "com.github.fommil.netlib:all:1.1.2"
scala> import com.github.fommil.netlib.BLAS
import com.github.fommil.netlib.BLAS
scala> BLAS.getInstance().getClass().getName()
res1: String = com.github.fommil.netlib.NativeSystemBLAS

ref:
http://spark.apache.org/downloads.html
http://spark.apache.org/docs/latest/building-spark.html
http://spark.apache.org/docs/latest/ml-guide.html#dependencies

Configurations

in .zshrc

if which java > /dev/null; then
  export JAVA_HOME="$(/usr/libexec/java_home -v 1.8)"
  export PATH="$JAVA_HOME/bin:$PATH"
fi

export PATH="/usr/local/opt/[email protected]/bin:$PATH"

# homebrew version
export SPARK_HOME="/usr/local/Cellar/apache-spark/2.2.0/libexec"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"
export PYSPARK_DRIVER_PYTHON="ipython"

# pre-built version
export SPARK_HOME="/usr/local/share/apache-spark/spark-2.2.0-bin-hadoop2.7"
export PATH="$SPARK_HOME/bin:$PATH"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"

# build version
export SPARK_HOME="/usr/local/share/apache-spark/spark-2.2.0"
export PATH="$SPARK_HOME/bin:$PATH"
export PYTHONPATH="$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH"

ref:
https://spark.apache.org/docs/latest/programming-guide.html
https://spark.apache.org/docs/latest/configuration.html

$ cd $SPARK_HOME

$ cp conf/spark-defaults.conf.template conf/spark-defaults.conf
spark.driver.memory              4g
spark.executor.memory            4g
spark.jars.packages              com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41
spark.serializer                 org.apache.spark.serializer.KryoSerializer

$ cp conf/spark-env.sh.template conf/spark-env.sh
export PYTHONHASHSEED=42

$ cp conf/log4j.properties.template conf/log4j.properties

ref:
https://spark.apache.org/docs/latest/configuration.html

Commands

Local Mode

$ spark-shell

$ export PYSPARK_DRIVER_PYTHON="jupyter" && \
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip 0.0.0.0" && \
pyspark \
--packages "com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41" \
--driver-memory 4g \
--executor-memory 4g \
--master "local[*]"

$ spark-shell \
--packages "com.github.fommil.netlib:all:1.1.2,mysql:mysql-connector-java:5.1.41"
--master "local-cluster[3, 1, 4096]"

# Spark Application UI on the driver
$ open http://localhost:4040/

ref:
https://spark.apache.org/docs/latest/programming-guide.html

Standalone mode

There are two deploy modes for Spark Standalone. In client mode, the driver is launched in the same process as the client that submits the application. In cluster mode, however, the driver is launched from one of the Worker.

$ ./sbin/start-master.sh -h localhost
$ ./sbin/start-slave.sh spark://localhost:7077

# Spark Web UI on the cluster manager
$ open http://localhost:8080/

$ pyspark \
--driver-memory 4g \
--executor-memory 4g \
--master spark://localhost:7077

$ spark-submit \
--master spark://localhost:7077 \
examples/src/main/python/pi.py 10

$ spark-submit \
--driver-memory 2g \
--driver-java-options "-XX:ThreadStackSize=81920" \
--total-executor-cores 3 \
--executor-cores 3 \
--executor-memory 12g \
--conf "spark.executor.extraJavaOptions=-XX:ThreadStackSize=81920" \
--master spark://localhost:7077 \
--packages "mysql:mysql-connector-java:5.1.41,com.hankcs:hanlp:portable-1.3.4,edu.stanford.nlp:stanford-corenlp:3.7.0" \
--jars "/Users/vinta/Projects/albedo/spark-data/stanford-corenlp-3.8.0-models.jar" \
--class ws.vinta.albedo.LogisticRegressionRanker \
target/albedo-1.0.0-SNAPSHOT.jar

# Spark Application UI on the driver
$ open http://localhost:4040/

ref:
https://spark.apache.org/docs/latest/spark-standalone.html
https://spark.apache.org/docs/latest/submitting-applications.html
https://spark.apache.org/docs/latest/configuration.html