以下的 methods 有些是所有 RDD 都能用,有些則是 PairRDD 才有的功能。然後因為在不同的 projects 我先後用了 Spark 的 Python API 和 Scala API,所以以下的內容可能會混雜著兩者的範例。Scala API 要特別注意每個 method 接受和回傳的 type 的差異;Python API 就沒有這種限制了,畢竟是動態語言。
ref:
http://spark.apache.org/docs/latest/programming-guide.html#transformations
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions
RDD Methods
map()
函數 signature 為 def map[U](f: (T) ⇒ U): RDD[U]
。
map()
接受的 function 的輸入參數就是 RDD 的每個元素(從 DataFrame 的角度看,每個 row):func(row)
,return 一個任意物件(例如一個 int、一個 string 或一個 tuple)。所以 map()
之後會得到一個 row 數相同的 RDD,但是 type 可能會不一樣。
matrix = [ (1, 1, 1), (1, 2, 1), (1, 3, 1), (1, 6, 0), (2, 6, 1), (3, 1, 1), (3, 5, 1), (4, 1, 0), (4, 4, 1), ] df = spark.createDataFrame(matrix, ['k', 'v', 'whatever']) df.rdd.map(lambda row: (row[0], row[1])).collect() # [(1, 1), (1, 2), (1, 3), (1, 6), (2, 6), (3, 1), (3, 5), (4, 1), (4, 4)]
import org.apache.spark.rdd.RDD val rdd: RDD[(String, Int)] = sc.parallelize(Seq( ("Python", 1), ("Scala", 3), ("Java", 2) )) val test: RDD[String] = rdd.map({ case (lang, count) => { s"${lang}_${count}" } }) test.collect().foreach(println) // Python_1 // Scala_3 // Java_2
flatMap()
函數 signature 為 def flatMap[U](f: (T) ⇒ TraversableOnce[U]): RDD[U]
。
flatMap()
跟 map()
很像,接受的 function 的輸入參數也是 RDD 的每個 row:func(row)
,差別在於 flatMap()
只能回傳一個 Iterable 物件(例如一個 tuple 或 list,但是可以是空的),而且 flatMap()
會把 return 的結果攤平。所以 flatMap()
之後的 count()
可能會比原本的 RDD 大或小。
matrix = [ (1, 1, 1), (1, 2, 1), (1, 3, 1), (1, 6, 0), (2, 6, 1), (3, 1, 1), (3, 5, 1), (4, 1, 0), (4, 4, 1), ] df = spark.createDataFrame(matrix, ['k', 'v', 'whatever']) df.rdd.flatMap(lambda row: (row[0], row[1])).collect() # [1, 1, 1, 2, 1, 3, 1, 6, 2, 6, 3, 1, 3, 5, 4, 1, 4, 4] df.rdd.flatMap(lambda row: (row[2], )).collect() # [1, 1, 1, 0, 1, 1, 1, 0, 1]
ref:
http://backtobazics.com/big-data/spark/apache-spark-flatmap-example/
http://apachesparkbook.blogspot.tw/2015/05/difference-between-map-flatmap-in-rdd.html
http://stackoverflow.com/questions/42867457/map-each-element-of-a-list-in-spark
http://stackoverflow.com/questions/21096432/list-or-iterator-of-tuples-returned-by-map-pyspark
reduce()
函數 signature 為 def reduce(f: (T, T) ⇒ T): T
。
reduce()
接受的 function 的輸入參數是 RDD 的兩兩元素:func(element1, element2)
,return 單一物件,而且是跟輸入參數同樣 type 的,最後整個 reduce()
會得到一個單一的值。
array = [ 1, 5, 4, 2, 3, ] rdd = sc.parallelize(array) rdd.reduce(lambda element1, element2: element1 + element2) # 15 def max(element1, element2): return element1 if element1 > element2 else element2 rdd.reduce(max) # 5
treeReduce()
函數 signature 為 def treeReduce(f: (T, T) ⇒ T, depth: Int = 2): T
。
普通的 reduce()
會直接把每一個 partition 的 reduce 結果送回 driver machine 做最後運算,當 partition 很多和每個 partition 的資料量很大的時候可能會是一個瓶頸。這時候你可以改用 treeReduce()
,不過使用不當的話反而會有反效果。類似的關係還有 aggregate()
和 treeAggregate()
。
aggregate()
函數 signature 為 def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U
。
import org.apache.spark.rdd.RDD case class WikipediaArticle(title: String, text: String) { def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang) } val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq( WikipediaArticle("a1", "abc Scala xyz"), WikipediaArticle("a2", "abc Python xyz"), WikipediaArticle("a3", "abc Scala xyz"), WikipediaArticle("a4", "abc Scala xyz") )) def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int = { val accumulateOp = (total: Int, a: WikipediaArticle) => total + 1 val combineOp = (count1: Int, count2: Int) => count1 + count2 rdd.filter(_.mentionsLanguage(lang)).aggregate(0)(accumulateOp, combineOp) } occurrencesOfLang("Scala", wikiRdd) // 3
ref:
https://stackoverflow.com/questions/28240706/explain-the-aggregate-functionality-in-spark
PairRDD Methods
PairRDD 就是包含 key/value pair 的 RDD,長這樣:RDD[(K, V)]
。K
和 V
除了可能是基本的 type 之外,也可能是其他 object 或 collection。PairRDDFunctions
回傳的也都是 RDD[(K, V)]
。
ref:
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
mapValues()
函數 signature 為 def mapValues[U](f: (V) ⇒ U): RDD[(K, U)]
。
mapValues()
的 function 接受的唯一一個參數就是 PairRDD
的 V
,它只處理 values。
val rdd = sc.parallelize(Seq( ("Python", 1), ("Scala", 3), ("Java", 2) )) rdd .mapValues((count: Int) => { count * 10 }) .collect() // Array((Python,10), (Scala,30), (Java,20))
ref:
http://apachesparkbook.blogspot.tw/2015/12/mapvalues-example.html
groupByKey()
函數 signature 為 def groupByKey(): RDD[(K, Iterable[V])]
。
以下這些情況應該避免使用 groupByKey()
:
- If operataion is expressed using groupByKey followed by associative and commutative reducing operation on values (
sum
,count
,max
/min
) it should be replaced byreduceByKey
. - If operation can be expressed using a comination of local sequence operation and merge operation (online variance / mean, top-n observations) it should be expressed with
combineByKey
oraggregateByKey
. - If final goal is to traverse values in a specific order (
groupByKey
followed by sorting values followed by iteration) it can be typically rewritten asrepartitionAndSortWithinPartitions
with custom partitioner and ordering followed bymapPartitions
.
case class WikipediaArticle(title: String, text: String) { def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang) } val langs = List( "Java", "PHP", "Python", "Scala" ) val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq( WikipediaArticle("a1", "a1 Scala"), WikipediaArticle("a2", "a2 Python"), WikipediaArticle("a3", "a3 Scala"), WikipediaArticle("a4", "a4 Java"), WikipediaArticle("a5", "a5 Java"), WikipediaArticle("a6", "a6 Scala") )) def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = { rdd .flatMap((article: WikipediaArticle) => { langs .filter((lang: String) => { article.mentionsLanguage(lang) }) .map((lang: String) => { (lang, article) }) }) .groupByKey() } def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd) // Array( // (Java,CompactBuffer(WikipediaArticle(a4,a4 Java), WikipediaArticle(a5,a5 Java))), // (Python,CompactBuffer(WikipediaArticle(a2,a2 Python))), // (Scala,CompactBuffer(WikipediaArticle(a1,a1 Scala), WikipediaArticle(a3,a3 Scala), WikipediaArticle(a6,a6 Scala))) // ) def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] = { index .map({ case (lang, articles) => { (lang, articles.size) } }) .sortBy(-_._2) .collect() .toList } rankLangsUsingIndex(index) // List((Scala,3),(Java,2),(Python,1),(PHP,0))
Spark Best Practices
https://github.com/beeva/beeva-best-practices/blob/master/big_data/spark/README.md
RDD actions and Transformations by Example
https://github.com/awesome-spark/spark-gotchas/blob/master/04_rdd_actions_and_transformations_by_example.md
Avoid groupByKey when performing an associative reductive operation
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/avoid_groupbykey_when_performing_an_associative_re.html
reduceByKey()
函數 signature 為 def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
。
reduceByKey()
接受的兩個參數的類型是由 PairRDD
的 V
決定的。做到同樣的功能,reduceByKey()
的執行效率比 groupByKey()
+ reduce()
好很多。
matrix = [ (1, 1, 1), (1, 2, 1), (1, 3, 1), (1, 6, 0), (2, 6, 1), (3, 1, 1), (3, 5, 1), (4, 1, 0), (4, 4, 1), ] df = spark.createDataFrame(matrix, ['k', 'v', 'whatever']) df.rdd \ .map(lambda row: (row[0], row[1])) \ .reduceByKey(lambda x, y: x + y) \ .collect() # map() => [(1, 1), (1, 2), (1, 3), (1, 6), (2, 6), (3, 1), (3, 5), (4, 1), (4, 4)] # reduceByKey() => [(1, 12), (2, 6), (3, 6), (4, 5)]
matrix = [ (1, [[100, 1.0], [200, 2.0]]), (2, [[300, 3.0]]), (2, [[400, 4.0], [600, 6.0]]), (2, [[500, 5.0]]), ] df = spark.createDataFrame(matrix, ['user', 'recommendations']) def merge_recommendations(recommendations1, recommendations2): return recommendations1 + recommendations2 def slice_recommendations(row, candidate_k): user, recommendations = row sliced_recommendations = sorted(recommendations, key=lambda recommendation: recommendation.rating, reverse=True)[:candidate_k + 5] return (user, sliced_recommendations) full_rdd = df \ .rdd \ .reduceByKey(lambda x, y: merge_recommendations(x, y)) \ .map(lambda row: slice_recommendations(row, candidate_k)) final_df = spark.createDataFrame(full_rdd, ['user', 'recommendations'])
val langs = List( "Java", "PHP", "Python", "Scala" ) val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq( WikipediaArticle("a1", "a1 Scala"), WikipediaArticle("a2", "a2 Python"), WikipediaArticle("a3", "a3 Scala"), WikipediaArticle("a4", "a4 Java"), WikipediaArticle("a5", "a5 Java"), WikipediaArticle("a6", "a6 Scala") )) def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = { rdd .flatMap((article: WikipediaArticle) => { langs .filter((lang: String) => { article.mentionsLanguage(lang) }) .map((lang: String) => { (lang, 1) }) }) .reduceByKey((count1: Int, count2: Int) => { count1 + count2 }) .collect() .toList .sortWith(_._2 > _._2) } rankLangsReduceByKey(langs, wikiRdd) // List((Scala,3),(Java,2),(Python,1),(PHP,0))
Avoid reduceByKey when the input and output value types are different
http://backtobazics.com/big-data/spark/apache-spark-reducebykey-example/
Reduce a key-value pair into a key-list pair
http://stackoverflow.com/questions/27002161/reduce-a-key-value-pair-into-a-key-list-pair-with-apache-spark
foldByKey()
函數 signature 為 def foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]
。
foldByKey()
基本上就是可以手動指定 zero value 的 reduceByKey()
。
matrix = [ (1, 1, 1), (1, 2, 1), (1, 3, 1), (1, 6, 0), (2, 6, 1), (3, 1, 1), (3, 5, 1), (4, 1, 0), (4, 4, 1), ] df = spark.createDataFrame(matrix, ['k', 'v', 'whatever']) df.rdd \ .map(lambda row: (row[0], [row[1], ])) \ .foldByKey(list(), add) \ .collect() # [(4, [1, 4]), (1, [1, 2, 3, 6]), (2, [6]), (3, [1, 5])]
aggregateByKey()
函數 signature 為 def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U): RDD[(K, U)]
。
適合用來取代 .map(lambda row: (row['user'], [row['item'], ])).reduceByKey(lambda v1, v2: v1 + v2)
這樣的套路。
matrix = [ (1, 1, 1), (1, 2, 1), (1, 4, 1), (1, 5, 1), (2, 2, 1), (2, 2, 1), (2, 3, 1), (3, 5, 1), ] df = spark.createDataFrame(matrix, ['k', 'v', 'whatever']) def seqFunc(item_set, item): item_set.add(item) return item_set def combFunc(item_set1, item_set2): return item_set1.union(item_set2) df.select('user', 'item').rdd \ .aggregateByKey(set(), seqFunc, combFunc) \ .collect() # [(1, {1, 2, 4, 5}), (2, {2, 3}), (3, {5})]
ref:
http://codingjunkie.net/spark-agr-by-key/
https://stackoverflow.com/questions/31081563/apache-spark-what-is-the-equivalent-implementation-of-rdd-groupbykey-using-rd
https://stackoverflow.com/questions/24804619/how-does-spark-aggregate-function-aggregatebykey-work
combineByKey()
函數 signature 為 def combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]
。
用來把 RDD[(K, V)]
轉變成 RDD[(K, C)]
,C
可以是任意的 type。
combineByKey()
接受三個 functions:
- createCombiner, which turns a V into a C (e.g., creates a one-element list)
- mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
- mergeCombiners, to combine two C’s into a single one.
import pyspark.sql.functions as F matrix = [ ('chinese', 80), ('math', 60), ('english', 100), ('chinese', 90), ('math', 100), ('math', 10), ('english', 70), ('english', 20), ('history', 30), ] df = spark.createDataFrame(matrix, ['subject', 'score']) def createCombiner(score): return (score, 1) def mergeValue(accumulate, score): total_score = accumulate[0] + score total_count = accumulate[1] + 1 return (total_score, total_count) def mergeCombiners(accumulate1, accumulate2): total_score = accumulate1[0] + accumulate2[0] total_count = accumulate1[1] + accumulate2[1] return (total_score, total_count) df.rdd.combineByKey(createCombiner, mergeValue, mergeCombiners).collect() # you could calculate the average score of every subject # [('chinese', (170, 2)), # ('history', (30, 1)), # ('math', (170, 3)), # ('english', (190, 3))]
ref:
https://zhangyi.gitbooks.io/spark-in-action/content/chapter2/rdd.html
cogroup()
如果你要 join
兩個已經被 groupBy
的 RDD,可以使用 cogroup()
。避免使用 flatMap
+ join
+ groupBy
這樣的套路。