{"id":377,"date":"2017-05-02T00:49:26","date_gmt":"2017-05-01T16:49:26","guid":{"rendered":"http:\/\/vinta.ws\/code\/?p=377"},"modified":"2026-02-18T01:20:36","modified_gmt":"2026-02-17T17:20:36","slug":"spark-rdd-methods-pyspark","status":"publish","type":"post","link":"https:\/\/vinta.ws\/code\/spark-rdd-methods-pyspark.html","title":{"rendered":"Spark RDD methods (Python \/ Scala)"},"content":{"rendered":"<p>\u4ee5\u4e0b\u7684 methods \u6709\u4e9b\u662f\u6240\u6709 RDD \u90fd\u80fd\u7528\uff0c\u6709\u4e9b\u5247\u662f PairRDD \u624d\u6709\u7684\u529f\u80fd\u3002\u7136\u5f8c\u56e0\u70ba\u5728\u4e0d\u540c\u7684 projects \u6211\u5148\u5f8c\u7528\u4e86 Spark \u7684 Python API \u548c Scala API\uff0c\u6240\u4ee5\u4ee5\u4e0b\u7684\u5167\u5bb9\u53ef\u80fd\u6703\u6df7\u96dc\u8457\u5169\u8005\u7684\u7bc4\u4f8b\u3002Scala API \u8981\u7279\u5225\u6ce8\u610f\u6bcf\u500b method \u63a5\u53d7\u548c\u56de\u50b3\u7684 type \u7684\u5dee\u7570\uff1bPython API \u5c31\u6c92\u6709\u9019\u7a2e\u9650\u5236\u4e86\uff0c\u7562\u7adf\u662f\u52d5\u614b\u8a9e\u8a00\u3002<\/p>\n<p>ref:<br \/>\n<a href=\"http:\/\/spark.apache.org\/docs\/latest\/programming-guide.html#transformations\">http:\/\/spark.apache.org\/docs\/latest\/programming-guide.html#transformations<\/a><br \/>\n<a href=\"http:\/\/spark.apache.org\/docs\/latest\/api\/scala\/index.html#org.apache.spark.rdd.PairRDDFunctions\">http:\/\/spark.apache.org\/docs\/latest\/api\/scala\/index.html#org.apache.spark.rdd.PairRDDFunctions<\/a><\/p>\n<h2>RDD Methods<\/h2>\n<h3>map()<\/h3>\n<p>\u51fd\u6578 signature \u70ba <code>def map[U](f: (T) \u21d2 U): RDD[U]<\/code>\u3002<\/p>\n<p><code>map()<\/code> \u63a5\u53d7\u7684 function \u7684\u8f38\u5165\u53c3\u6578\u5c31\u662f RDD \u7684\u6bcf\u500b\u5143\u7d20\uff08\u5f9e DataFrame \u7684\u89d2\u5ea6\u770b\uff0c\u6bcf\u500b row\uff09\uff1a<code>func(row)<\/code>\uff0creturn \u4e00\u500b\u4efb\u610f\u7269\u4ef6\uff08\u4f8b\u5982\u4e00\u500b int\u3001\u4e00\u500b string \u6216\u4e00\u500b tuple\uff09\u3002\u6240\u4ee5 <code>map()<\/code> \u4e4b\u5f8c\u6703\u5f97\u5230\u4e00\u500b row \u6578\u76f8\u540c\u7684 RDD\uff0c\u4f46\u662f type \u53ef\u80fd\u6703\u4e0d\u4e00\u6a23\u3002<\/p>\n<pre class=\"line-numbers\"><code class=\"language-py\">matrix = [\n    (1, 1, 1),\n    (1, 2, 1),\n    (1, 3, 1),\n    (1, 6, 0),\n    (2, 6, 1),\n    (3, 1, 1),\n    (3, 5, 1),\n    (4, 1, 0),\n    (4, 4, 1),\n]\ndf = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])\n\ndf.rdd.map(lambda row: (row[0], row[1])).collect()\n# [(1, 1), (1, 2), (1, 3), (1, 6), (2, 6), (3, 1), (3, 5), (4, 1), (4, 4)]<\/code><\/pre>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.rdd.RDD\n\nval rdd: RDD[(String, Int)] = sc.parallelize(Seq(\n  (\"Python\", 1),\n  (\"Scala\", 3),\n  (\"Java\", 2)\n))\nval test: RDD[String] = rdd.map({\n  case (lang, count) =&gt; {\n    s\"${lang}_${count}\"\n  }\n})\ntest.collect().foreach(println)\n\/\/ Python_1\n\/\/ Scala_3\n\/\/ Java_2<\/code><\/pre>\n<h3>flatMap()<\/h3>\n<p>\u51fd\u6578 signature \u70ba <code>def flatMap[U](f: (T) \u21d2 TraversableOnce[U]): RDD[U]<\/code>\u3002<\/p>\n<p><code>flatMap()<\/code> \u8ddf <code>map()<\/code> \u5f88\u50cf\uff0c\u63a5\u53d7\u7684 function \u7684\u8f38\u5165\u53c3\u6578\u4e5f\u662f RDD \u7684\u6bcf\u500b row\uff1a<code>func(row)<\/code>\uff0c\u5dee\u5225\u5728\u65bc <code>flatMap()<\/code> \u53ea\u80fd\u56de\u50b3\u4e00\u500b Iterable \u7269\u4ef6\uff08\u4f8b\u5982\u4e00\u500b tuple \u6216 list\uff0c\u4f46\u662f\u53ef\u4ee5\u662f\u7a7a\u7684\uff09\uff0c\u800c\u4e14 <code>flatMap()<\/code> \u6703\u628a return \u7684\u7d50\u679c\u6524\u5e73\u3002\u6240\u4ee5 <code>flatMap()<\/code> \u4e4b\u5f8c\u7684 <code>count()<\/code> \u53ef\u80fd\u6703\u6bd4\u539f\u672c\u7684 RDD \u5927\u6216\u5c0f\u3002<\/p>\n<pre class=\"line-numbers\"><code class=\"language-py\">matrix = [\n    (1, 1, 1),\n    (1, 2, 1),\n    (1, 3, 1),\n    (1, 6, 0),\n    (2, 6, 1),\n    (3, 1, 1),\n    (3, 5, 1),\n    (4, 1, 0),\n    (4, 4, 1),\n]\ndf = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])\n\ndf.rdd.flatMap(lambda row: (row[0], row[1])).collect()\n# [1, 1, 1, 2, 1, 3, 1, 6, 2, 6, 3, 1, 3, 5, 4, 1, 4, 4]\n\ndf.rdd.flatMap(lambda row: (row[2], )).collect()\n# [1, 1, 1, 0, 1, 1, 1, 0, 1]<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"http:\/\/backtobazics.com\/big-data\/spark\/apache-spark-flatmap-example\/\">http:\/\/backtobazics.com\/big-data\/spark\/apache-spark-flatmap-example\/<\/a><br \/>\n<a href=\"http:\/\/apachesparkbook.blogspot.tw\/2015\/05\/difference-between-map-flatmap-in-rdd.html\">http:\/\/apachesparkbook.blogspot.tw\/2015\/05\/difference-between-map-flatmap-in-rdd.html<\/a><br \/>\n<a href=\"http:\/\/stackoverflow.com\/questions\/42867457\/map-each-element-of-a-list-in-spark\">http:\/\/stackoverflow.com\/questions\/42867457\/map-each-element-of-a-list-in-spark<\/a><br \/>\n<a href=\"http:\/\/stackoverflow.com\/questions\/21096432\/list-or-iterator-of-tuples-returned-by-map-pyspark\">http:\/\/stackoverflow.com\/questions\/21096432\/list-or-iterator-of-tuples-returned-by-map-pyspark<\/a><\/p>\n<h3>reduce()<\/h3>\n<p>\u51fd\u6578 signature \u70ba <code>def reduce(f: (T, T) \u21d2 T): T<\/code>\u3002<\/p>\n<p><code>reduce()<\/code> \u63a5\u53d7\u7684 function \u7684\u8f38\u5165\u53c3\u6578\u662f RDD \u7684\u5169\u5169\u5143\u7d20\uff1a<code>func(element1, element2)<\/code>\uff0creturn \u55ae\u4e00\u7269\u4ef6\uff0c\u800c\u4e14\u662f\u8ddf\u8f38\u5165\u53c3\u6578\u540c\u6a23 type \u7684\uff0c\u6700\u5f8c\u6574\u500b <code>reduce()<\/code> \u6703\u5f97\u5230\u4e00\u500b\u55ae\u4e00\u7684\u503c\u3002<\/p>\n<pre class=\"line-numbers\"><code class=\"language-py\">array = [\n    1,\n    5,\n    4,\n    2,\n    3,\n]\nrdd = sc.parallelize(array)\n\nrdd.reduce(lambda element1, element2: element1 + element2)\n# 15\n\ndef max(element1, element2):\n    return element1 if element1 &gt; element2 else element2\n\nrdd.reduce(max)\n# 5<\/code><\/pre>\n<h3>treeReduce()<\/h3>\n<p>\u51fd\u6578 signature \u70ba <code>def treeReduce(f: (T, T) \u21d2 T, depth: Int = 2): T<\/code>\u3002<\/p>\n<p>\u666e\u901a\u7684 <code>reduce()<\/code> \u6703\u76f4\u63a5\u628a\u6bcf\u4e00\u500b partition \u7684 reduce \u7d50\u679c\u9001\u56de driver machine \u505a\u6700\u5f8c\u904b\u7b97\uff0c\u7576 partition \u5f88\u591a\u548c\u6bcf\u500b partition \u7684\u8cc7\u6599\u91cf\u5f88\u5927\u7684\u6642\u5019\u53ef\u80fd\u6703\u662f\u4e00\u500b\u74f6\u9838\u3002\u9019\u6642\u5019\u4f60\u53ef\u4ee5\u6539\u7528 <code>treeReduce()<\/code>\uff0c\u4e0d\u904e\u4f7f\u7528\u4e0d\u7576\u7684\u8a71\u53cd\u800c\u6703\u6709\u53cd\u6548\u679c\u3002\u985e\u4f3c\u7684\u95dc\u4fc2\u9084\u6709 <code>aggregate()<\/code> \u548c <code>treeAggregate()<\/code>\u3002<\/p>\n<p>ref:<br \/>\n<a href=\"https:\/\/umbertogriffo.gitbooks.io\/apache-spark-best-practices-and-tuning\/content\/treereduce_and_treeaggregate_demystified.html\">https:\/\/umbertogriffo.gitbooks.io\/apache-spark-best-practices-and-tuning\/content\/treereduce_and_treeaggregate_demystified.html<\/a><\/p>\n<h3>aggregate()<\/h3>\n<p>\u51fd\u6578 signature \u70ba <code>def aggregate[U](zeroValue: U)(seqOp: (U, T) \u21d2 U, combOp: (U, U) \u21d2 U): U<\/code>\u3002<\/p>\n<pre class=\"line-numbers\"><code class=\"language-scala\">import org.apache.spark.rdd.RDD\n\ncase class WikipediaArticle(title: String, text: String) {\n  def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)\n}\n\nval wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(\n  WikipediaArticle(\"a1\", \"abc Scala xyz\"),\n  WikipediaArticle(\"a2\", \"abc Python xyz\"),\n  WikipediaArticle(\"a3\", \"abc Scala xyz\"),\n  WikipediaArticle(\"a4\", \"abc Scala xyz\")\n))\n\ndef occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int = {\n  val accumulateOp = (total: Int, a: WikipediaArticle) =&gt; total + 1\n  val combineOp = (count1: Int, count2: Int) =&gt; count1 + count2\n  rdd.filter(_.mentionsLanguage(lang)).aggregate(0)(accumulateOp, combineOp)\n}\n\noccurrencesOfLang(\"Scala\", wikiRdd)\n\/\/ 3<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/28240706\/explain-the-aggregate-functionality-in-spark\">https:\/\/stackoverflow.com\/questions\/28240706\/explain-the-aggregate-functionality-in-spark<\/a><\/p>\n<h2>PairRDD Methods<\/h2>\n<p>PairRDD \u5c31\u662f\u5305\u542b key\/value pair \u7684 RDD\uff0c\u9577\u9019\u6a23\uff1a<code>RDD[(K, V)]<\/code>\u3002<code>K<\/code> \u548c <code>V<\/code> \u9664\u4e86\u53ef\u80fd\u662f\u57fa\u672c\u7684 type \u4e4b\u5916\uff0c\u4e5f\u53ef\u80fd\u662f\u5176\u4ed6 object \u6216 collection\u3002<code>PairRDDFunctions<\/code> \u56de\u50b3\u7684\u4e5f\u90fd\u662f <code>RDD[(K, V)]<\/code>\u3002<\/p>\n<p>ref:<br \/>\n<a href=\"https:\/\/www.safaribooksonline.com\/library\/view\/learning-spark\/9781449359034\/ch04.html\">https:\/\/www.safaribooksonline.com\/library\/view\/learning-spark\/9781449359034\/ch04.html<\/a><\/p>\n<h3>mapValues()<\/h3>\n<p>\u51fd\u6578 signature \u70ba <code>def mapValues[U](f: (V) \u21d2 U): RDD[(K, U)]<\/code>\u3002<\/p>\n<p><code>mapValues()<\/code> \u7684 function \u63a5\u53d7\u7684\u552f\u4e00\u4e00\u500b\u53c3\u6578\u5c31\u662f <code>PairRDD<\/code> \u7684 <code>V<\/code>\uff0c\u5b83\u53ea\u8655\u7406 values\u3002<\/p>\n<pre class=\"line-numbers\"><code class=\"language-scala\">val rdd = sc.parallelize(Seq(\n  (\"Python\", 1),\n  (\"Scala\", 3),\n  (\"Java\", 2)\n))\n\nrdd\n  .mapValues((count: Int) =&gt; {\n    count * 10\n  })\n  .collect()\n\/\/ Array((Python,10), (Scala,30), (Java,20))<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"http:\/\/apachesparkbook.blogspot.tw\/2015\/12\/mapvalues-example.html\">http:\/\/apachesparkbook.blogspot.tw\/2015\/12\/mapvalues-example.html<\/a><\/p>\n<h3>groupByKey()<\/h3>\n<p>\u51fd\u6578 signature \u70ba <code>def groupByKey(): RDD[(K, Iterable[V])]<\/code>\u3002<\/p>\n<p>\u4ee5\u4e0b\u9019\u4e9b\u60c5\u6cc1\u61c9\u8a72\u907f\u514d\u4f7f\u7528 <code>groupByKey()<\/code>\uff1a<\/p>\n<ul>\n<li>If operataion is expressed using groupByKey followed by associative and commutative reducing operation on values (<code>sum<\/code>, <code>count<\/code>, <code>max<\/code> \/ <code>min<\/code>) it should be replaced by <code>reduceByKey<\/code>.<\/li>\n<li>If operation can be expressed using a comination of local sequence operation and merge operation (online variance \/ mean, top-n observations) it should be expressed with <code>combineByKey<\/code> or <code>aggregateByKey<\/code>.<\/li>\n<li>If final goal is to traverse values in a specific order (<code>groupByKey<\/code> followed by sorting values followed by iteration) it can be typically rewritten as <code>repartitionAndSortWithinPartitions<\/code> with custom partitioner and ordering followed by <code>mapPartitions<\/code>.<\/li>\n<\/ul>\n<pre class=\"line-numbers\"><code class=\"language-scala\">case class WikipediaArticle(title: String, text: String) {\n  def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)\n}\n\nval langs = List(\n  \"Java\", \"PHP\", \"Python\", \"Scala\"\n)\n\nval wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(\n  WikipediaArticle(\"a1\", \"a1 Scala\"),\n  WikipediaArticle(\"a2\", \"a2 Python\"),\n  WikipediaArticle(\"a3\", \"a3 Scala\"),\n  WikipediaArticle(\"a4\", \"a4 Java\"),\n  WikipediaArticle(\"a5\", \"a5 Java\"),\n  WikipediaArticle(\"a6\", \"a6 Scala\")\n))\n\ndef makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = {\n  rdd\n    .flatMap((article: WikipediaArticle) =&gt; {\n      langs\n        .filter((lang: String) =&gt; {\n          article.mentionsLanguage(lang)\n        })\n        .map((lang: String) =&gt; {\n          (lang, article)\n        })\n    })\n    .groupByKey()\n}\n\ndef index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd)\n\/\/ Array(\n\/\/   (Java,CompactBuffer(WikipediaArticle(a4,a4 Java), WikipediaArticle(a5,a5 Java))),\n\/\/   (Python,CompactBuffer(WikipediaArticle(a2,a2 Python))),\n\/\/   (Scala,CompactBuffer(WikipediaArticle(a1,a1 Scala), WikipediaArticle(a3,a3 Scala), WikipediaArticle(a6,a6 Scala)))\n\/\/ )\n\ndef rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] = {\n  index\n    .map({\n      case (lang, articles) =&gt; {\n        (lang, articles.size)\n      }\n    })\n    .sortBy(-_._2)\n    .collect()\n    .toList\n}\n\nrankLangsUsingIndex(index)\n\/\/ List((Scala,3),(Java,2),(Python,1),(PHP,0))<\/code><\/pre>\n<p>Spark Best Practices<br \/>\n<a href=\"https:\/\/github.com\/beeva\/beeva-best-practices\/blob\/master\/big_data\/spark\/README.md\">https:\/\/github.com\/beeva\/beeva-best-practices\/blob\/master\/big_data\/spark\/README.md<\/a><\/p>\n<p>RDD actions and Transformations by Example<br \/>\n<a href=\"https:\/\/github.com\/awesome-spark\/spark-gotchas\/blob\/master\/04_rdd_actions_and_transformations_by_example.md\">https:\/\/github.com\/awesome-spark\/spark-gotchas\/blob\/master\/04_rdd_actions_and_transformations_by_example.md<\/a><\/p>\n<p>Avoid groupByKey when performing an associative reductive operation<br \/>\n<a href=\"https:\/\/umbertogriffo.gitbooks.io\/apache-spark-best-practices-and-tuning\/content\/avoid_groupbykey_when_performing_an_associative_re.html\">https:\/\/umbertogriffo.gitbooks.io\/apache-spark-best-practices-and-tuning\/content\/avoid_groupbykey_when_performing_an_associative_re.html<\/a><\/p>\n<h3>reduceByKey()<\/h3>\n<p>\u51fd\u6578 signature \u70ba <code>def reduceByKey(func: (V, V) \u21d2 V): RDD[(K, V)]<\/code>\u3002<\/p>\n<p><code>reduceByKey()<\/code> \u63a5\u53d7\u7684\u5169\u500b\u53c3\u6578\u7684\u985e\u578b\u662f\u7531 <code>PairRDD<\/code> \u7684 <code>V<\/code> \u6c7a\u5b9a\u7684\u3002\u505a\u5230\u540c\u6a23\u7684\u529f\u80fd\uff0c<code>reduceByKey()<\/code> \u7684\u57f7\u884c\u6548\u7387\u6bd4 <code>groupByKey()<\/code> + <code>reduce()<\/code> \u597d\u5f88\u591a\u3002<\/p>\n<pre class=\"line-numbers\"><code class=\"language-py\">matrix = [\n    (1, 1, 1),\n    (1, 2, 1),\n    (1, 3, 1),\n    (1, 6, 0),\n    (2, 6, 1),\n    (3, 1, 1),\n    (3, 5, 1),\n    (4, 1, 0),\n    (4, 4, 1),\n]\ndf = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])\n\ndf.rdd \n    .map(lambda row: (row[0], row[1])) \n    .reduceByKey(lambda x, y: x + y) \n    .collect()\n# map() =&gt; [(1, 1), (1, 2), (1, 3), (1, 6), (2, 6), (3, 1), (3, 5), (4, 1), (4, 4)]\n# reduceByKey() =&gt; [(1, 12), (2, 6), (3, 6), (4, 5)]<\/code><\/pre>\n<pre class=\"line-numbers\"><code class=\"language-py\">matrix = [\n    (1, [[100, 1.0], [200, 2.0]]),\n    (2, [[300, 3.0]]),\n    (2, [[400, 4.0], [600, 6.0]]),\n    (2, [[500, 5.0]]),\n]\ndf = spark.createDataFrame(matrix, ['user', 'recommendations'])\n\ndef merge_recommendations(recommendations1, recommendations2):\n    return recommendations1 + recommendations2\n\ndef slice_recommendations(row, candidate_k):\n    user, recommendations = row\n    sliced_recommendations = sorted(recommendations, key=lambda recommendation: recommendation.rating, reverse=True)[:candidate_k + 5]\n    return (user, sliced_recommendations)\n\nfull_rdd = df \n    .rdd \n    .reduceByKey(lambda x, y: merge_recommendations(x, y)) \n    .map(lambda row: slice_recommendations(row, candidate_k))\nfinal_df = spark.createDataFrame(full_rdd, ['user', 'recommendations'])<\/code><\/pre>\n<pre class=\"line-numbers\"><code class=\"language-scala\">val langs = List(\n  \"Java\", \"PHP\", \"Python\", \"Scala\"\n)\n\nval wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(\n  WikipediaArticle(\"a1\", \"a1 Scala\"),\n  WikipediaArticle(\"a2\", \"a2 Python\"),\n  WikipediaArticle(\"a3\", \"a3 Scala\"),\n  WikipediaArticle(\"a4\", \"a4 Java\"),\n  WikipediaArticle(\"a5\", \"a5 Java\"),\n  WikipediaArticle(\"a6\", \"a6 Scala\")\n))\n\ndef rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] = {\n  rdd\n    .flatMap((article: WikipediaArticle) =&gt; {\n      langs\n        .filter((lang: String) =&gt; {\n          article.mentionsLanguage(lang)\n        })\n        .map((lang: String) =&gt; {\n          (lang, 1)\n        })\n    })\n    .reduceByKey((count1: Int, count2: Int) =&gt; {\n      count1 + count2\n    })\n    .collect()\n    .toList\n    .sortWith(_._2 &gt; _._2)\n}\n\nrankLangsReduceByKey(langs, wikiRdd)\n\/\/ List((Scala,3),(Java,2),(Python,1),(PHP,0))<\/code><\/pre>\n<p>Avoid reduceByKey when the input and output value types are different<br \/>\n<a href=\"http:\/\/backtobazics.com\/big-data\/spark\/apache-spark-reducebykey-example\/\">http:\/\/backtobazics.com\/big-data\/spark\/apache-spark-reducebykey-example\/<\/a><\/p>\n<p>Reduce a key-value pair into a key-list pair<br \/>\n<a href=\"http:\/\/stackoverflow.com\/questions\/27002161\/reduce-a-key-value-pair-into-a-key-list-pair-with-apache-spark\">http:\/\/stackoverflow.com\/questions\/27002161\/reduce-a-key-value-pair-into-a-key-list-pair-with-apache-spark<\/a><\/p>\n<h3>foldByKey()<\/h3>\n<p>\u51fd\u6578 signature \u70ba <code>def foldByKey(zeroValue: V)(func: (V, V) \u21d2 V): RDD[(K, V)]<\/code>\u3002<\/p>\n<p><code>foldByKey()<\/code> \u57fa\u672c\u4e0a\u5c31\u662f\u53ef\u4ee5\u624b\u52d5\u6307\u5b9a zero value \u7684 <code>reduceByKey()<\/code>\u3002<\/p>\n<pre class=\"line-numbers\"><code class=\"language-py\">matrix = [\n    (1, 1, 1),\n    (1, 2, 1),\n    (1, 3, 1),\n    (1, 6, 0),\n    (2, 6, 1),\n    (3, 1, 1),\n    (3, 5, 1),\n    (4, 1, 0),\n    (4, 4, 1),\n]\ndf = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])\n\ndf.rdd \n    .map(lambda row: (row[0], [row[1], ])) \n    .foldByKey(list(), add) \n    .collect()\n# [(4, [1, 4]), (1, [1, 2, 3, 6]), (2, [6]), (3, [1, 5])]<\/code><\/pre>\n<h3>aggregateByKey()<\/h3>\n<p>\u51fd\u6578 signature \u70ba <code>def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) \u21d2 U, combOp: (U, U) \u21d2 U): RDD[(K, U)]<\/code>\u3002<\/p>\n<p>\u9069\u5408\u7528\u4f86\u53d6\u4ee3 <code>.map(lambda row: (row[&#039;user&#039;], [row[&#039;item&#039;], ])).reduceByKey(lambda v1, v2: v1 + v2)<\/code> \u9019\u6a23\u7684\u5957\u8def\u3002<\/p>\n<pre class=\"line-numbers\"><code class=\"language-py\">matrix = [\n    (1, 1, 1),\n    (1, 2, 1),\n    (1, 4, 1),\n    (1, 5, 1),\n    (2, 2, 1),\n    (2, 2, 1),\n    (2, 3, 1),\n    (3, 5, 1),\n]\ndf = spark.createDataFrame(matrix, ['k', 'v', 'whatever'])\n\ndef seqFunc(item_set, item):\n    item_set.add(item)\n    return item_set\n\ndef combFunc(item_set1, item_set2):\n    return item_set1.union(item_set2)\n\ndf.select('user', 'item').rdd \n    .aggregateByKey(set(), seqFunc, combFunc) \n    .collect()\n# [(1, {1, 2, 4, 5}), (2, {2, 3}), (3, {5})]<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"http:\/\/codingjunkie.net\/spark-agr-by-key\/\">http:\/\/codingjunkie.net\/spark-agr-by-key\/<\/a><br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/31081563\/apache-spark-what-is-the-equivalent-implementation-of-rdd-groupbykey-using-rd\">https:\/\/stackoverflow.com\/questions\/31081563\/apache-spark-what-is-the-equivalent-implementation-of-rdd-groupbykey-using-rd<\/a><br \/>\n<a href=\"https:\/\/stackoverflow.com\/questions\/24804619\/how-does-spark-aggregate-function-aggregatebykey-work\">https:\/\/stackoverflow.com\/questions\/24804619\/how-does-spark-aggregate-function-aggregatebykey-work<\/a><\/p>\n<h3>combineByKey()<\/h3>\n<p>\u51fd\u6578 signature \u70ba <code>def combineByKey[C](createCombiner: (V) \u21d2 C, mergeValue: (C, V) \u21d2 C, mergeCombiners: (C, C) \u21d2 C): RDD[(K, C)]<\/code>\u3002<\/p>\n<p>\u7528\u4f86\u628a <code>RDD[(K, V)]<\/code> \u8f49\u8b8a\u6210 <code>RDD[(K, C)]<\/code>\uff0c<code>C<\/code> \u53ef\u4ee5\u662f\u4efb\u610f\u7684 type\u3002<\/p>\n<p><code>combineByKey()<\/code> \u63a5\u53d7\u4e09\u500b functions\uff1a<\/p>\n<ul>\n<li>createCombiner, which turns a V into a C (e.g., creates a one-element list)<\/li>\n<li>mergeValue, to merge a V into a C (e.g., adds it to the end of a list)<\/li>\n<li>mergeCombiners, to combine two C\u2019s into a single one.<\/li>\n<\/ul>\n<pre class=\"line-numbers\"><code class=\"language-py\">import pyspark.sql.functions as F\n\nmatrix = [\n    ('chinese', 80),\n    ('math', 60),\n    ('english', 100),\n    ('chinese', 90),\n    ('math', 100),\n    ('math', 10),\n    ('english', 70),\n    ('english', 20),\n    ('history', 30),\n]\ndf = spark.createDataFrame(matrix, ['subject', 'score'])\n\ndef createCombiner(score):\n    return (score, 1)\n\ndef mergeValue(accumulate, score):\n    total_score = accumulate[0] + score\n    total_count = accumulate[1] + 1\n    return (total_score, total_count)\n\ndef mergeCombiners(accumulate1, accumulate2):\n    total_score = accumulate1[0] + accumulate2[0]\n    total_count = accumulate1[1] + accumulate2[1]\n    return (total_score, total_count)\n\ndf.rdd.combineByKey(createCombiner, mergeValue, mergeCombiners).collect()\n# you could calculate the average score of every subject\n# [('chinese', (170, 2)),\n # ('history', (30, 1)),\n # ('math', (170, 3)),\n # ('english', (190, 3))]<\/code><\/pre>\n<p>ref:<br \/>\n<a href=\"https:\/\/zhangyi.gitbooks.io\/spark-in-action\/content\/chapter2\/rdd.html\">https:\/\/zhangyi.gitbooks.io\/spark-in-action\/content\/chapter2\/rdd.html<\/a><\/p>\n<h3>cogroup()<\/h3>\n<p>\u5982\u679c\u4f60\u8981 <code>join<\/code> \u5169\u500b\u5df2\u7d93\u88ab <code>groupBy<\/code> \u7684 RDD\uff0c\u53ef\u4ee5\u4f7f\u7528 <code>cogroup()<\/code>\u3002\u907f\u514d\u4f7f\u7528 <code>flatMap<\/code> + <code>join<\/code> + <code>groupBy<\/code> \u9019\u6a23\u7684\u5957\u8def\u3002<\/p>\n","protected":false},"excerpt":{"rendered":"<p>\u4ee5\u4e0b\u7684 methods \u6709\u4e9b\u662f\u6240\u6709 RDD \u90fd\u80fd\u7528\uff0c\u6709\u4e9b\u5247\u662f PairRDD \u624d\u6709\u7684\u529f\u80fd\u3002\u7136\u5f8c\u56e0\u70ba\u5728\u4e0d\u540c\u7684 projects \u6211\u5148\u5f8c\u7528\u4e86 Spark \u7684 Python API \u548c Scala API\uff0c\u6240\u4ee5\u4ee5\u4e0b\u7684\u5167\u5bb9\u53ef\u80fd\u6703\u6df7\u96dc\u8457\u5169\u8005\u7684\u7bc4\u4f8b\u3002Scala API \u8981\u7279\u5225\u6ce8\u610f\u6bcf\u500b method \u63a5\u53d7\u548c\u56de\u50b3\u7684 type \u7684\u5dee\u7570\uff1bPython API \u5c31\u6c92\u6709\u9019\u7a2e\u9650\u5236\u4e86\uff0c\u7562\u7adf\u662f\u52d5\u614b\u8a9e\u8a00\u3002<\/p>\n","protected":false},"author":1,"featured_media":378,"comment_status":"closed","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[97,112,4],"tags":[108,110,2],"class_list":["post-377","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-about-ai","category-about-big-data","category-about-python","tag-apache-spark","tag-functional-programming","tag-python"],"_links":{"self":[{"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/posts\/377","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/comments?post=377"}],"version-history":[{"count":0,"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/posts\/377\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/media\/378"}],"wp:attachment":[{"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/media?parent=377"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/categories?post=377"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/vinta.ws\/code\/wp-json\/wp\/v2\/tags?post=377"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}