以下的 methods 有些是所有 RDD 都能用,有些則是 PairRDD 才有的功能。然後因為在不同的 projects 我先後用了 Spark 的 Python API 和 Scala API,所以以下的內容可能會混雜著兩者的範例。Scala API 要特別注意每個 method 接受和回傳的 type 的差異;Python API 就沒有這種限制了,畢竟是動態語言。
ref:
http://spark.apache.org/docs/latest/programming-guide.html#transformations
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions
RDD Methods
map()
函數 signature 為 def map[U](f: (T) ⇒ U): RDD[U]
。
map()
接受的 function 的輸入參數就是 RDD 的每個元素(從 DataFrame 的角度看,每個 row):func(row)
,return 一個任意物件(例如一個 int、一個 string 或一個 tuple)。所以 map()
之後會得到一個 row 數相同的 RDD,但是 type 可能會不一樣。
matrix = [
(1, 1, 1),
(1, 2, 1),
(1, 3, 1),
(1, 6, 0),
(2, 6, 1),
(3, 1, 1),
(3, 5, 1),
(4, 1, 0),
(4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])
df.rdd.map(lambda row: (row[0], row[1])).collect()
# [(1, 1), (1, 2), (1, 3), (1, 6), (2, 6), (3, 1), (3, 5), (4, 1), (4, 4)]
import org.apache.spark.rdd.RDD
val rdd: RDD[(String, Int)] = sc.parallelize(Seq(
("Python", 1),
("Scala", 3),
("Java", 2)
))
val test: RDD[String] = rdd.map({
case (lang, count) => {
s"${lang}_${count}"
}
})
test.collect().foreach(println)
// Python_1
// Scala_3
// Java_2
flatMap()
函數 signature 為 def flatMap[U](f: (T) ⇒ TraversableOnce[U]): RDD[U]
。
flatMap()
跟 map()
很像,接受的 function 的輸入參數也是 RDD 的每個 row:func(row)
,差別在於 flatMap()
只能回傳一個 Iterable 物件(例如一個 tuple 或 list,但是可以是空的),而且 flatMap()
會把 return 的結果攤平。所以 flatMap()
之後的 count()
可能會比原本的 RDD 大或小。
matrix = [
(1, 1, 1),
(1, 2, 1),
(1, 3, 1),
(1, 6, 0),
(2, 6, 1),
(3, 1, 1),
(3, 5, 1),
(4, 1, 0),
(4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])
df.rdd.flatMap(lambda row: (row[0], row[1])).collect()
# [1, 1, 1, 2, 1, 3, 1, 6, 2, 6, 3, 1, 3, 5, 4, 1, 4, 4]
df.rdd.flatMap(lambda row: (row[2], )).collect()
# [1, 1, 1, 0, 1, 1, 1, 0, 1]
ref:
http://backtobazics.com/big-data/spark/apache-spark-flatmap-example/
http://apachesparkbook.blogspot.tw/2015/05/difference-between-map-flatmap-in-rdd.html
http://stackoverflow.com/questions/42867457/map-each-element-of-a-list-in-spark
http://stackoverflow.com/questions/21096432/list-or-iterator-of-tuples-returned-by-map-pyspark
reduce()
函數 signature 為 def reduce(f: (T, T) ⇒ T): T
。
reduce()
接受的 function 的輸入參數是 RDD 的兩兩元素:func(element1, element2)
,return 單一物件,而且是跟輸入參數同樣 type 的,最後整個 reduce()
會得到一個單一的值。
array = [
1,
5,
4,
2,
3,
]
rdd = sc.parallelize(array)
rdd.reduce(lambda element1, element2: element1 + element2)
# 15
def max(element1, element2):
return element1 if element1 > element2 else element2
rdd.reduce(max)
# 5
treeReduce()
函數 signature 為 def treeReduce(f: (T, T) ⇒ T, depth: Int = 2): T
。
普通的 reduce()
會直接把每一個 partition 的 reduce 結果送回 driver machine 做最後運算,當 partition 很多和每個 partition 的資料量很大的時候可能會是一個瓶頸。這時候你可以改用 treeReduce()
,不過使用不當的話反而會有反效果。類似的關係還有 aggregate()
和 treeAggregate()
。
aggregate()
函數 signature 為 def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U
。
import org.apache.spark.rdd.RDD
case class WikipediaArticle(title: String, text: String) {
def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}
val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
WikipediaArticle("a1", "abc Scala xyz"),
WikipediaArticle("a2", "abc Python xyz"),
WikipediaArticle("a3", "abc Scala xyz"),
WikipediaArticle("a4", "abc Scala xyz")
))
def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int = {
val accumulateOp = (total: Int, a: WikipediaArticle) => total + 1
val combineOp = (count1: Int, count2: Int) => count1 + count2
rdd.filter(_.mentionsLanguage(lang)).aggregate(0)(accumulateOp, combineOp)
}
occurrencesOfLang("Scala", wikiRdd)
// 3
ref:
https://stackoverflow.com/questions/28240706/explain-the-aggregate-functionality-in-spark
PairRDD Methods
PairRDD 就是包含 key/value pair 的 RDD,長這樣:RDD[(K, V)]
。K
和 V
除了可能是基本的 type 之外,也可能是其他 object 或 collection。PairRDDFunctions
回傳的也都是 RDD[(K, V)]
。
ref:
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
mapValues()
函數 signature 為 def mapValues[U](f: (V) ⇒ U): RDD[(K, U)]
。
mapValues()
的 function 接受的唯一一個參數就是 PairRDD
的 V
,它只處理 values。
val rdd = sc.parallelize(Seq(
("Python", 1),
("Scala", 3),
("Java", 2)
))
rdd
.mapValues((count: Int) => {
count * 10
})
.collect()
// Array((Python,10), (Scala,30), (Java,20))
ref:
http://apachesparkbook.blogspot.tw/2015/12/mapvalues-example.html
groupByKey()
函數 signature 為 def groupByKey(): RDD[(K, Iterable[V])]
。
以下這些情況應該避免使用 groupByKey()
:
- If operataion is expressed using groupByKey followed by associative and commutative reducing operation on values (
sum
,count
,max
/min
) it should be replaced byreduceByKey
. - If operation can be expressed using a comination of local sequence operation and merge operation (online variance / mean, top-n observations) it should be expressed with
combineByKey
oraggregateByKey
. - If final goal is to traverse values in a specific order (
groupByKey
followed by sorting values followed by iteration) it can be typically rewritten asrepartitionAndSortWithinPartitions
with custom partitioner and ordering followed bymapPartitions
.
case class WikipediaArticle(title: String, text: String) {
def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}
val langs = List(
"Java", "PHP", "Python", "Scala"
)
val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
WikipediaArticle("a1", "a1 Scala"),
WikipediaArticle("a2", "a2 Python"),
WikipediaArticle("a3", "a3 Scala"),
WikipediaArticle("a4", "a4 Java"),
WikipediaArticle("a5", "a5 Java"),
WikipediaArticle("a6", "a6 Scala")
))
def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = {
rdd
.flatMap((article: WikipediaArticle) => {
langs
.filter((lang: String) => {
article.mentionsLanguage(lang)
})
.map((lang: String) => {
(lang, article)
})
})
.groupByKey()
}
def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd)
// Array(
// (Java,CompactBuffer(WikipediaArticle(a4,a4 Java), WikipediaArticle(a5,a5 Java))),
// (Python,CompactBuffer(WikipediaArticle(a2,a2 Python))),
// (Scala,CompactBuffer(WikipediaArticle(a1,a1 Scala), WikipediaArticle(a3,a3 Scala), WikipediaArticle(a6,a6 Scala)))
// )
def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] = {
index
.map({
case (lang, articles) => {
(lang, articles.size)
}
})
.sortBy(-_._2)
.collect()
.toList
}
rankLangsUsingIndex(index)
// List((Scala,3),(Java,2),(Python,1),(PHP,0))
Spark Best Practices
https://github.com/beeva/beeva-best-practices/blob/master/big_data/spark/README.md
RDD actions and Transformations by Example
https://github.com/awesome-spark/spark-gotchas/blob/master/04_rdd_actions_and_transformations_by_example.md
Avoid groupByKey when performing an associative reductive operation
https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/avoid_groupbykey_when_performing_an_associative_re.html
reduceByKey()
函數 signature 為 def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
。
reduceByKey()
接受的兩個參數的類型是由 PairRDD
的 V
決定的。做到同樣的功能,reduceByKey()
的執行效率比 groupByKey()
+ reduce()
好很多。
matrix = [
(1, 1, 1),
(1, 2, 1),
(1, 3, 1),
(1, 6, 0),
(2, 6, 1),
(3, 1, 1),
(3, 5, 1),
(4, 1, 0),
(4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])
df.rdd \
.map(lambda row: (row[0], row[1])) \
.reduceByKey(lambda x, y: x + y) \
.collect()
# map() => [(1, 1), (1, 2), (1, 3), (1, 6), (2, 6), (3, 1), (3, 5), (4, 1), (4, 4)]
# reduceByKey() => [(1, 12), (2, 6), (3, 6), (4, 5)]
matrix = [
(1, [[100, 1.0], [200, 2.0]]),
(2, [[300, 3.0]]),
(2, [[400, 4.0], [600, 6.0]]),
(2, [[500, 5.0]]),
]
df = spark.createDataFrame(matrix, ['user', 'recommendations'])
def merge_recommendations(recommendations1, recommendations2):
return recommendations1 + recommendations2
def slice_recommendations(row, candidate_k):
user, recommendations = row
sliced_recommendations = sorted(recommendations, key=lambda recommendation: recommendation.rating, reverse=True)[:candidate_k + 5]
return (user, sliced_recommendations)
full_rdd = df \
.rdd \
.reduceByKey(lambda x, y: merge_recommendations(x, y)) \
.map(lambda row: slice_recommendations(row, candidate_k))
final_df = spark.createDataFrame(full_rdd, ['user', 'recommendations'])
val langs = List(
"Java", "PHP", "Python", "Scala"
)
val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
WikipediaArticle("a1", "a1 Scala"),
WikipediaArticle("a2", "a2 Python"),
WikipediaArticle("a3", "a3 Scala"),
WikipediaArticle("a4", "a4 Java"),
WikipediaArticle("a5", "a5 Java"),
WikipediaArticle("a6", "a6 Scala")
))
def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = {
rdd
.flatMap((article: WikipediaArticle) => {
langs
.filter((lang: String) => {
article.mentionsLanguage(lang)
})
.map((lang: String) => {
(lang, 1)
})
})
.reduceByKey((count1: Int, count2: Int) => {
count1 + count2
})
.collect()
.toList
.sortWith(_._2 > _._2)
}
rankLangsReduceByKey(langs, wikiRdd)
// List((Scala,3),(Java,2),(Python,1),(PHP,0))
Avoid reduceByKey when the input and output value types are different
http://backtobazics.com/big-data/spark/apache-spark-reducebykey-example/
Reduce a key-value pair into a key-list pair
http://stackoverflow.com/questions/27002161/reduce-a-key-value-pair-into-a-key-list-pair-with-apache-spark
foldByKey()
函數 signature 為 def foldByKey(zeroValue: V)(func: (V, V) ⇒ V): RDD[(K, V)]
。
foldByKey()
基本上就是可以手動指定 zero value 的 reduceByKey()
。
matrix = [
(1, 1, 1),
(1, 2, 1),
(1, 3, 1),
(1, 6, 0),
(2, 6, 1),
(3, 1, 1),
(3, 5, 1),
(4, 1, 0),
(4, 4, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])
df.rdd \
.map(lambda row: (row[0], [row[1], ])) \
.foldByKey(list(), add) \
.collect()
# [(4, [1, 4]), (1, [1, 2, 3, 6]), (2, [6]), (3, [1, 5])]
aggregateByKey()
函數 signature 為 def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U): RDD[(K, U)]
。
適合用來取代 .map(lambda row: (row['user'], [row['item'], ])).reduceByKey(lambda v1, v2: v1 + v2)
這樣的套路。
matrix = [
(1, 1, 1),
(1, 2, 1),
(1, 4, 1),
(1, 5, 1),
(2, 2, 1),
(2, 2, 1),
(2, 3, 1),
(3, 5, 1),
]
df = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])
def seqFunc(item_set, item):
item_set.add(item)
return item_set
def combFunc(item_set1, item_set2):
return item_set1.union(item_set2)
df.select('user', 'item').rdd \
.aggregateByKey(set(), seqFunc, combFunc) \
.collect()
# [(1, {1, 2, 4, 5}), (2, {2, 3}), (3, {5})]
ref:
http://codingjunkie.net/spark-agr-by-key/
https://stackoverflow.com/questions/31081563/apache-spark-what-is-the-equivalent-implementation-of-rdd-groupbykey-using-rd
https://stackoverflow.com/questions/24804619/how-does-spark-aggregate-function-aggregatebykey-work
combineByKey()
函數 signature 為 def combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]
。
用來把 RDD[(K, V)]
轉變成 RDD[(K, C)]
,C
可以是任意的 type。
combineByKey()
接受三個 functions:
- createCombiner, which turns a V into a C (e.g., creates a one-element list)
- mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
- mergeCombiners, to combine two C’s into a single one.
import pyspark.sql.functions as F
matrix = [
('chinese', 80),
('math', 60),
('english', 100),
('chinese', 90),
('math', 100),
('math', 10),
('english', 70),
('english', 20),
('history', 30),
]
df = spark.createDataFrame(matrix, ['subject', 'score'])
def createCombiner(score):
return (score, 1)
def mergeValue(accumulate, score):
total_score = accumulate[0] + score
total_count = accumulate[1] + 1
return (total_score, total_count)
def mergeCombiners(accumulate1, accumulate2):
total_score = accumulate1[0] + accumulate2[0]
total_count = accumulate1[1] + accumulate2[1]
return (total_score, total_count)
df.rdd.combineByKey(createCombiner, mergeValue, mergeCombiners).collect()
# you could calculate the average score of every subject
# [('chinese', (170, 2)),
# ('history', (30, 1)),
# ('math', (170, 3)),
# ('english', (190, 3))]
ref:
https://zhangyi.gitbooks.io/spark-in-action/content/chapter2/rdd.html
cogroup()
如果你要 join
兩個已經被 groupBy
的 RDD,可以使用 cogroup()
。避免使用 flatMap
+ join
+ groupBy
這樣的套路。