Spark best practices

Architecture

A Spark application is a set of processes running on a cluster, all these processes are coordinated by the driver program. The driver program is 1) the process where the main() method of your program runs, 2) the process running the code that creates a SparkSession, RDDs, DataFrames, and stages up or sends off transformations and actions.

Those processes that run computations and store data for your application are executors. Executors are 1) returning computed results to the driver program, 2) provoiding in-memory storage for cached RDDs/DataFrames.

Execution of a Spark program:

  • The driver program runs the Spark application, which creates a SparkSession upon start-up.
  • The SparkSession connects to a cluster manager (e.g., YARN/Mesos) which allocates resources.
  • Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application. Actions are happening on workers.
  • The driver program sends your application code (e.g., functions which applied on map()) to the executors.
  • Transformations and actions are queued up and optimized by the driver program, sent to executors to run, and then the executors send results back to the driver program.

名詞解釋:

  • Driver program:就是 master node,負責建立 SparkSession,你的 Spark app 的 main() 就是跑在這上面
  • Worker node:cluster 裡除了 driver 之外的那些機器,實際執行分散式運算的機器,基本上一台機器就是一個 worker 或 slave
  • Executor:worker node 上的一個個 processes,通常一個 core 對應一個 executor
  • Cluster Manager: 負責管理資源,通常是 YARN。driver program 和 workers 之間會透過 cluster manager 來溝通

ref:
https://spark.apache.org/docs/latest/cluster-overview.html
https://databricks.com/blog/2016/06/22/apache-spark-key-terms-explained.html

Resilient Distributed Dataset (RDD)

RDDs are divided into partitions: each partition can be considered as an immutable subset of the entire RDD. When you execute your Spark program, each partition gets sent to a worker.

對 RDD 的操作可以分為兩種:

  • 所有 return RDD 的操作就是 transformation
    • 會 lazy 地建立新的 RDD,例如 .map().flatMap().filter()
  • 所有不是 return RDD 的就是 action
    • 會 eager 地執行操作,例如 reduce()count().collect()

someRDD.foreach(println) 為例,foreach 是個 action,因為它回傳的是 Unitprintln 實際上是 print 到 executor 的 stdout 了,所以你在 driver program 根本看不到。除非你改成 someRDD.collect().foreach(println)

someRDD.take(10) 為例,take(10) 實際上是在 executors 上被運算出來的,但是會把那 10 筆結果傳回到 driver program 上。

ref:
https://zhangyi.gitbooks.io/spark-in-action/content/chapter2/rdd.html

logRDD.filter(line => line.contains("ERROR")).take(10) 為例,filter() 是 lazy 的好處是,Spark 知道最後只需要 take(10),所以當它 filter 集滿 10 個符合條件的 lines 時就可以不用繼續執行下去了,不需要對整個 dataset 做 filter。

logRDD.map(_.toLowerCase).filter(_.contains("error")).count() 為例,雖然 map()filter() 是兩個操作,但是因為它們都是 lazy evaluation,所以 Spark 能夠在 count() 階段判斷,其實可以在讀取每一行 log 的時候同時做 .toLowerCase.contains("error")

By default, RDDs are recomputed each time you run an action on them! Spark allows you to control what should be cached in memory.

ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/0GZV7/evaluation-in-spark-unlike-scala-collections

Shuffling

Moving data from one node to another across network is called "shuffling". A shuffle can occur when the resulting RDD depends on other elements from the same RDD or another RDD, you can also figure out whether a shuffle has been planned via 1)the return type of certain transformations (e.g. ShuffledRDD); 2) using toDebugString to see its execution plan.

Operatons that might cause a shuffle:

  • cogroup
  • groupWith
  • join
  • leftOuterJoin
  • rightOuterJoin
  • groupByKey
  • reduceByKey
  • combineByKey
  • distinct
  • intersection
  • repartition
  • coalesce

Narrow dependencies:

  • map
  • mapValues
  • flatMap
  • filter
  • mapPartitions
  • mapPartitionsWithIndex

Wide dependencies:

  • cogroup
  • groupWith
  • join
  • leftOuterJoin
  • rightOuterJoin
  • groupByKey
  • reduceByKey
  • combineByKey
  • distinct
  • intersection

ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/bT1YR/shuffling-what-it-is-and-why-its-important

Partitioning

Partitions never span multiple machines, data in the same partition are guaranteed to be on the same machine. The default number of partitions is the total number of cores on all executor nodes.

Customizing partitions is only possible when working with Pair RDD, because of partitioning is done based on keys. The most importing thing is that you must cache() or persist() your RDDs after re-partitioning.

Following operations hold to (and propagate) a partitioner:

  • cogroup
  • groupWith
  • groupByKey
  • reduceByKey
  • foldByKey
  • combineByKey
  • partitionBy
  • join
  • sort
  • mapValues (if parent has a partitioner)
  • flatMapValues (if parent has a partitioner)
  • filter (if parent has a partitioner)

All other operations (e.g. map()) will produce a result without a partitioner.

ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/Vkhm0/partitioning

You can chain a call to .repartition(n) after reading the text file to specify a different and larger number of partitions. You might set this higher to match the number of cores in your cluster, for example.
make num of partitions equal the num of cores in your cluster

ref:
https://www.safaribooksonline.com/library/view/learning-apache-spark/9781785885136/ch01s04.html
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/sparksqlshufflepartitions_draft.html

It's really important to repartition your dataset if you are going to cache it and use for queries. The optimalmal number of partitions is around 4-6 for each executor core, with 40 nodes and 6 executor cores we use 1000 partitions for best performance.

ref:
https://databricks.com/blog/2015/10/13/interactive-audience-analytics-with-apache-spark-and-hyperloglog.html
https://stackoverflow.com/questions/40416357/spark-sql-difference-between-df-repartition-and-dataframewriter-partitionby

Dataset

Dataset 的 transformation API 分成 untyped 和 typed,使用了 untyped API 之後會回傳 DataFrame,會失去 type;typed API 則會回傳 Dataset。如果是像 select() 這樣的 API,只要顯式地加上 type casting,例如 ds.select($"name".as[String], $"age".as[Int]),回傳的東西就會是 Dataset 而不是 DataFrame。

Untyped transformations:

  • 幾乎所有那些 DataFrame 可以用的 transformations,例如 groupBy()

Typed transformations:

  • map()
  • flatMap()
  • reduce()
  • groupByKey()
  • agg()
  • mapGroups()
  • flatMapGroups()
  • reduceGroups()

Datasets don't get all of the optimization the DataFrames get!

盡量使用 Spark SQL 的寫法(即 relational operations),例如 filter($"city".as[String] === "Boston")select($"age".as[Int]) 等,少用 higher-order functions(即 functional operations),例如 filter(p => p.city == "Boston")map() 等,Catalyst 對後者的優化不好。

ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/yrfPh/datasets
https://www.51zero.com/blog/2016/2/24/type-safety-on-spark-dataframes-part-1

Encoders

Encoders are what convert your data between JVM objects and Spark SQL's specilized internal representation. They're required by all Datasets.

Two ways to introduce Encoders:

  • Automaticcly via import spark.implicits._
  • Explicitly via org.apache.spark.sql.Encoder

ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/yrfPh/datasets

Broadcast

某個 DataFrame 小到一台機器可以吃得下,就可以 broadcast 它。

ref:
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/when_to_use_broadcast_variable.html

Spark UI

If your dataset is large, you can try repartitioning to a larger number to allow more parallelism on your job. A good indication of this is if in the Spark UI – you don’t have a lot of tasks, but each task is very slow to complete.

ref:
https://databricks.com/blog/2015/06/22/understanding-your-spark-application-through-visualization.html
https://databricks.com/blog/2016/10/18/7-tips-to-debug-apache-spark-code-faster-with-databricks.html
https://docs.databricks.com/spark/latest/rdd-streaming/debugging-streaming-applications.html

Configurations

ref:
https://spark.apache.org/docs/latest/configuration.html
https://spark.apache.org/docs/latest/tuning.html#memory-tuning

My spark-defaults.conf

spark.driver.maxResultSize       2g
spark.jars.packages              com.github.fommil.netlib:all:1.1.2,com.hankcs:hanlp:portable-1.3.4,mysql:mysql-connector-java:5.1.41,org.apache.httpcomponents:httpclient:4.5.2,org.elasticsearch.client:elasticsearch-rest-high-level-client:5.6.2
spark.kryoserializer.buffer.max  1g
spark.serializer                 org.apache.spark.serializer.KryoSerializer

Allocate Resources (Executors, Cores, and Memory)

When submitting a Spark application via spark-submit, you may specify following options:

  • --driver-memory MEM: Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
  • --executor-memory MEM: Memory per executor (e.g. 1000M, 2G) (Default: 1G).

YARN and Spark standalone only:

  • --executor-cores NUM: Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode)

YARN only:

  • --driver-cores NUM: Number of cores used by the driver, only in cluster mode (Default: 1).
  • --num-executors NUM: Number of executors to launch (Default: 2).

計算方式:

driver: 2cores 7.5GB x 1
worker: 8cores 30GB x 4

  • actual memory for driver = driver memory - (10% overhead x 2)
    • 7.5 - (7.5 x 0.1 x 2) = 6
    • spark.driver.memory=6g
  • cores per executor = 5
    • for good HDFS throughput
    • spark.executor.cores=5
    • --executor-cores 5
  • total cores in cluster = (cores per node - 1) x total nodes
    • leave 1 core per node for YARN daemons
    • (8 - 1) x 4 = 28
  • total executors in cluster = (total cores in cluster / cores per executor) - 1
    • leave 1 executor for ApplicationManager
    • (28 / 5) - 1 = 4
    • spark.executor.instances=4
    • --num-executors 4
  • executors per node = (total executors in cluster + 1) / total nodes
    • (4 + 1) / 4 = 1
  • memory per executor = (memory per node / executors per node) - (10% overhead x 2) - 3
    • (30 / 1) - (30 x 0.1 x 2) - 3 = 21
    • spark.executor.memory=21g

ref:
https://spark.apache.org/docs/latest/submitting-applications.html
https://spark.apache.org/docs/latest/configuration.html
https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application
https://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications/21
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/