Scala is the first class citizen language for interacting with Apache Spark, but it's difficult to learn. This article is mostly about operating DataFrame or Dataset in Spark SQL.
Access SparkSession
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
implicit val spark: SparkSession = SparkSession
.builder()
.appName("PersonalizedRanker")
.config(conf)
.getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("WARN")
Before Spark 2.0:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
val conf: SparkConf = new SparkConf()
.setAppName("PersonalizedRanker")
.setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
ref:
https://sparkour.urizone.net/recipes/understanding-sparksession/
https://spark.apache.org/docs/latest/configuration.html
Import Implicits
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
// you could also access SparkSession via any Dataset or DataFrame
import someDS.sparkSession.implicits._
import someDF.sparkSession.implicits._
val ratingDF = rawDF
.selectExpr("from_user_id AS user", "repo_id AS item", "1 AS rating", "starred_at")
.orderBy($"user", $"starred_at".desc)
Read Data from MySQL
import java.util.Properties
val dbUrl = "jdbc:mysql://mysql:3306/albedo?user=root&password=123&verifyServerCertificate=false&useSSL=false"
val prop = new Properties()
prop.put("driver", "com.mysql.jdbc.Driver")
val rawDF = spark.read.jdbc(dbUrl, "app_repostarring", prop)
ref:
https://docs.databricks.com/spark/latest/data-sources/sql-databases.html
Read Data from a Apache Parquet File
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
import spark.implicits._
case class RepoStarring(
user_id: Int,
repo_id: Int,
starred_at: java.sql.Timestamp,
starring: Double
)
val dataDir = "."
val today = "20170820"
val savePath = s"$dataDir/spark-data/$today/repoStarringDF.parquet"
val df: DataFrame = try {
spark.read.parquet(savePath)
} catch {
case e: AnalysisException => {
if (e.getMessage().contains("Path does not exist")) {
val df = spark.read.jdbc(dbUrl, "app_repostarring", props)
.select("user_id", "repo_id", "starred_at")
.withColumn("starring", lit(1.0))
df.write.mode("overwrite").parquet(savePath)
df
} else {
throw e
}
}
}
df.as[RepoStarring]
Read Data from a Apache Avro File
$ spark-submit --packages "com.databricks:spark-avro_2.11:3.2.0"
import com.databricks.spark.avro._
import org.apache.spark.sql.SparkSession
implicit val spark = SparkSession
.builder()
.appName("Word2VecTrainer")
.getOrCreate()
val df = spark.read.avro("./githubarchive_repo_info.avro")
df.show()
ref:
https://github.com/databricks/spark-avro
Create a RDD from a list of Certain Case Class
import org.apache.spark.rdd.RDD
case class WikipediaArticle(title: String, text: String) {
def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}
val wikiRdd: RDD[WikipediaArticle] = sc.parallelize(Seq(
WikipediaArticle("a1", "abc Scala xyz"),
WikipediaArticle("a2", "abc Python xyz"),
WikipediaArticle("a3", "abc Scala xyz"),
WikipediaArticle("a4", "abc Scala xyz")
))
Get a Subset of a RDD
val subsetArr = someRDD.take(1000)
var subsetRDD = sc.parallelize(subsetArr)
Create a DataFrame from a Seq
import org.apache.spark.sql.functions._
val df = spark.createDataFrame(Seq(
(1, 1, 1, "2017-05-16 20:01:00.0"),
(1, 2, 1, "2017-05-17 21:01:00.0"),
(2, 1, 0, "2017-05-18 22:01:00.0"),
(3, 1, 1, "2017-05-10 22:01:00.0")
))
.toDF("user", "item", "star", "starred_at")
.withColumn("starred_at", unix_timestamp(col("starred_at")).cast("timestamp"))
// you could override the schema
// val df = spark.createDataFrame(df.rdd, starringSchema)
case class Starring(user: Int, item: Int, star: Int, starred_at: java.sql.Timestamp)
val ds = df.as[Starring]
df.printSchema()
// root
// |-- user: integer (nullable = false)
// |-- item: integer (nullable = false)
// |-- star: integer (nullable = false)
// |-- starred_at: timestamp (nullable = false)
ref:
https://medium.com/@mrpowers/manually-creating-spark-dataframes-b14dae906393
Create a Dataset
import spark.implicits._
val ds1 = spark.read.json("people.json").as[Person]
val ds2 = df.toDS
val ds3 = rdd.toDS
ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/yrfPh/datasets
Statistics
repoInfoDS.count()
// 16000
repoInfoDS
.describe("language", "size", "stargazers_count", "forks_count", "subscribers_count", "open_issues_count")
.show()
// +-------+--------+-----------------+-----------------+----------------+-----------------+-----------------+
// |summary|language| size| stargazers_count| forks_count|subscribers_count|open_issues_count|
// +-------+--------+-----------------+-----------------+----------------+-----------------+-----------------+
// | count| 16000| 16000| 16000| 16000| 16000| 16000|
// | mean| null| 14777.8928125| 238.5754375| 61.164375| 20.802375| 9.896625|
// | stddev| null|133926.8365289536|1206.220336641683|394.950236056925|84.09955924587845|56.72585435688847|
// | min| ANTLR| 0| 2| 0| 0| 0|
// | max| XSLT| 9427027| 56966| 28338| 4430| 3503|
// +-------+--------+-----------------+-----------------+----------------+-----------------+-----------------+
repoInfoDS.stat.corr("stargazers_count", "forks_count")
// 0.8408082958003276
repoInfoDS.stat.corr("stargazers_count", "size")
// 0.05351197356230549
repoInfoDS.stat.freqItems(Seq("language"), 0.1).show(truncate=false)
// +-----------------------------------------------------------+
// |language_freqItems |
// +-----------------------------------------------------------+
// |[Ruby, Objective-C, C, Go, JavaScript, Swift, Java, Python]|
// +-----------------------------------------------------------+
Generate a Schema from a List of Column Names
import org.apache.spark.sql.types._
def generateSchema(columnNames: List[String]): StructType = {
val fields = columnNames.map({
case columnName: String if columnName == "tucaseid" => {
StructField(columnName, StringType, nullable = false)
}
case columnName => {
StructField(columnName, DoubleType, nullable = false)
}
})
StructType(fields.toList)
}
val columnNames = List("tucaseid", "gemetsta", "gtmetsta", "peeduca", "pehspnon", "ptdtrace", "teage")
val schema = generateSchema(columnNames)
Generate a Schema from a Case Class
import java.sql.Timestamp
import org.apache.spark.sql.Encoders
case class Starring(user: Int, item: Int, star: Int, starred_at: Timestamp)
val starringSchema = Encoders.product[Starring].schema
ref:
https://stackoverflow.com/questions/36746055/generate-a-spark-structtype-schema-from-a-case-class
Change a DataFrame's Schema
val newDF = spark.createDataFrame(oldDF.rdd, starringSchema)
Print the Vector Length of Specific Columns
import org.apache.spark.ml.linalg.Vector
userProfileDF.select("languages_preferences_w2v", "clean_bio_w2v")
.head
.toSeq.foreach((field: Any) => {
println(field.asInstanceOf[Vector].size)
})
Convert a DataFrame into a Dataset
import java.sql.Timestamp
import org.apache.spark.ml.feature.SQLTransformer
case class Starring(user: Int, item: Int, star: Int, starred_at: Timestamp)
val starringBuilder = new SQLTransformer()
val starringSQL = """
SELECT from_user_id AS user, repo_id AS item, 1 AS star, starred_at
FROM __THIS__
ORDER BY user, starred_at DESC
"""
starringBuilder.setStatement(starringSQL)
val starringDS = starringBuilder.transform(rawDF).as[Starring]
Convert a Dataset into a PairRDD
val pairRDD = dataset
.select("user", "item")
.rdd
.map(row => (row(0), row(1)))
// or
case class UserItemPair(user: Int, item: Int)
val pairRDD = dataset
.select("user", "item").as[UserItemPair]
.rdd
.map(row => (row.user, row.item))
// or
import org.apache.spark.sql.Row
val pairRDD = dataset
.select("user", "item")
.rdd
.map({
case Row(user: Int, item: Int) => (user, item)
})
ref:
https://stackoverflow.com/questions/30655914/map-rdd-to-pairrdd-in-scala
Convert a Dataset into a List or Set
val popularRepos = popularReposDS
.select("item")
.rdd
.map(r => r(0).asInstanceOf[Int])
.collect()
.to[List]
// or
val popularRepos = popularReposDS
.select($"repo_id".as[Int])
.collect()
.to[List]
// List(98837133, 97071697, 17439026, ...)
Add a Column to a DataFrame
import org.apache.spark.sql.functions.lit
val newDF = df.withColumn("starring", lit(1))
Repeat withColumn()
import org.apache.spark.sql.DataFrame
val booleanColumnNames = Array("fork", "has_issues", "has_projects", "has_downloads", "has_wiki", "has_pages")
val convertedRepoInfoDF = booleanColumnNames.foldLeft[DataFrame](cleanRepoInfoDF)(
(accDF, columnName) => accDF.withColumn(s"clean_$columnName", col(columnName).cast("int"))
)
// or
val lowerableColumnNames = Array("description", "language", "topics")
val booleanColumnNames = Array("fork", "has_issues", "has_projects", "has_downloads", "has_wiki", "has_pages")
val convertedRepoInfoDF = (lowerableColumnNames ++ booleanColumnNames).foldLeft[DataFrame](cleanRepoInfoDF)((accDF, columnName) => {
columnName match {
case _ if lowerableColumnNames.contains(columnName) =>
accDF.withColumn(s"clean_$columnName", lower(col(columnName)))
case _ if booleanColumnNames.contains(columnName) =>
accDF.withColumn(s"clean_$columnName", col(columnName).cast("int"))
}
})
Specify that a Row has any Nullable Column
val nullableColumnNames = Array("bio", "blog", "company", "email", "location", "name")
val df2 = df1.withColumn("has_null", when(nullableColumnNames.map(df1(_).isNull).reduce(_||_), true).otherwise(false))
// or
val df2 = df1.withColumn("has_null", when($"bio".isNull or
$"blog".isNull or
$"company".isNull or
$"email".isNull or
$"location".isNull or
$"name".isNull, true).otherwise(false))
Combine Two Columns into an Array
import org.apache.spark.sql.functions._
fixRepoInfoDS
.where("topics != ''")
.withColumn("tags", struct("language", "topics"))
.select("tags")
.show(truncate=false)
// +----------------------------------------------------------------------+
// |tags |
// +----------------------------------------------------------------------+
// |[Ruby,aasm,activerecord,mongoid,ruby,state-machine,transition] |
// |[Ruby,captcha,rails,recaptcha,ruby,sinatra] |
// |[Python,commandline,gtd,python,sqlite,todo] |
// ...
// +----------------------------------------------------------------------+
Select Nested Column Directly
userRecommendationsDF
+----+-----------------------------------------------------------------+
|user|recommendations |
+----+-----------------------------------------------------------------+
|1 |[[2,0.9997897], [4,0.9988761], [1,0.9988487], [5,0.0]] |
|3 |[[5,0.9984464], [1,2.9802322E-8], [2,0.0], [4,0.0]] |
|2 |[[4,0.9921428], [2,0.10759391], [1,0.10749264], [5,1.4901161E-8]]|
+----+-----------------------------------------------------------------+
userRecommendationsDF.printSchema()
// root
// |-- user: integer (nullable = false)
// |-- recommendations: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- item: integer (nullable = true)
// | | |-- rating: float (nullable = true)
val userItemsDF = userRecommendationsDF.select($"user", $"recommendations.item")
// +----+------------+
// |user| item|
// +----+------------+
// | 1|[2, 4, 1, 5]|
// | 3|[5, 1, 2, 4]|
// | 2|[4, 2, 1, 5]|
// +----+------------+
ref:
https://stackoverflow.com/questions/38413040/spark-sql-udf-with-complex-input-parameter
Flatten an Array Column
import org.apache.spark.sql.Row
val userRecommenderItemDF = alsModel.recommendForAllUsers(15)
.flatMap((row: Row) => {
val userID = row.getInt(0)
val recommendations = row.getSeq[Row](1)
recommendations.map {
case Row(repoID: Int, score: Float) => {
(userID, repoID, score, "als")
}
}
})
.toDF("user_id", "repo_id", "score", "source")
userRecommenderItemDF.show()
// +-------+-------+-------------+------+
// |user_id|repo_id|score |source|
// +-------+-------+-------------+------+
// |2142 |48 |0.05021151 |als |
// |2142 |118 |0.05021151 |als |
// |2142 |68 |0.7791124 |als |
// |2142 |98 |0.9939307 |als |
// |2142 |28 |0.53719014 |als |
// +-------+-------+-------------+------+
Define Conditional Columns
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
val workingStatusProjection: Column = when($"telfs" >= 1 && $"telfs" < 3, "working").
otherwise("not working").
as("working")
val ageProjection: Column = when($"teage" >= 15 && $"teage" <= 22 , "young").
when($"teage" >= 23 && $"teage" <= 55 , "active").
otherwise("elder").
as("age")
val workProjection: Column = workColumns.reduce(_+_).divide(60).as("work_hours")
val new DF = df.select(workingStatusProjection, ageProjection, workProjection)
// +-----------+------+------------------+
// | working| age| work_hours|
// +-----------+------+------------------+
// | working| elder| 0.0|
// | working|active| 0.0|
// | working|active| 0.0|
// |not working|active| 2.0|
// | working|active| 8.583333333333334|
// +-----------+------+------------------+
Create a Custom Transformation Function for any DataFrame
def withGreeting(df: DataFrame): DataFrame = {
df.withColumn("greeting", lit("hello world"))
}
def withCat(name: String)(df: DataFrame): DataFrame = {
df.withColumn("cat", lit(s"$name meow"))
}
val df = Seq(
"funny",
"person"
).toDF("something")
val weirdDf = df
.transform(withGreeting)
.transform(withCat("kitten"))
// +---------+-----------+-----------+
// |something| greeting| cat|
// +---------+-----------+-----------+
// | funny|hello world|kitten meow|
// | person|hello world|kitten meow|
// +---------+-----------+-----------+
ref:
https://medium.com/@mrpowers/chaining-custom-dataframe-transformations-in-spark-a39e315f903c
Combine Two Arrays from Two DataFrames
// or
val bothItemsRDD = userPredictedItemsDF.join(userActualItemsDF, Seq("user_id", "user_id"))
.select(userPredictedItemsDF.col("items"), userActualItemsDF.col("items"))
.rdd
.map({
case Row(userPredictedItems: Seq[Int], userActualItems: Seq[Int]) => {
(userPredictedItems.slice(0, $(k)).toArray, userActualItems.slice(0, $(k)).toArray)
}
})
Handle Missing Values (NaN and Null)
NaN
stands for "Not a Number", it's usually the result of a mathematical operation that doesn't make sense, e.g. 5/0.
val df1 = repoInfoDF.na.fill("")
val df2 = repoInfoDF.na.fill("", Seq("description", "homepage"))
ref:
https://stackoverflow.com/questions/33089781/spark-dataframe-handing-empty-string-in-onehotencoder
https://stackoverflow.com/questions/43882699/which-differences-there-are-between-null-and-nan-in-spark-how-to-deal-with
Calculate the Difference between Two Dates
import org.apache.spark.sql.functions._
df.withColumn("updated_at_days_since_today", datediff(current_date(), $"updated_at"))
ref:
https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html
Create an User-Defined Function (UDF) which Accepts One Column
import org.apache.spark.sql.functions.udf
val df = Seq(
(1, "Company Name Inc."),
(2, "Company Name Co."),
(3, "Company Name ")
).toDF("id", "company")
val removeUninformativeWords = (text: String) => {
text
.toLowerCase()
.replaceAll(", inc.", "")
.replaceAll(" inc.", "")
.replaceAll("-inc", "")
.replaceAll(" co., ltd.", "")
.replaceAll(" co.", "")
.replaceAll(" ltd.", "")
.replaceAll(".com", "")
.replaceAll("@", "")
.trim()
}
val removeUninformativeWordsUDF = udf(removeUninformativeWords)
val newDF = df.withColumn("fix_company", removeUninformativeWordsUDF($"company"))
newDF.show()
ref:
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-udfs.html
Create an User-Defined Function (UDF) which Accepts Multiple Columns
When defining your UDFs, you must use Row
for mapping StructType
instead of any custom case classes.
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row
userRecommendationsDF.printSchema()
// root
// |-- user: integer (nullable = false)
// |-- recommendations: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- item: integer (nullable = true)
// | | |-- rating: float (nullable = true)
val extractItemsUDF = udf((user: Int, recommendations: Seq[Row]) => {
val items: Seq[Int] = recommendations.map(_.getInt(0))
val ratings: Seq[Float] = recommendations.map(_.getFloat(1))
items
})
userRecommendationsDF.withColumn("items", extractItemsUDF($"user", $"recommendations"))
// +----+-----------------------------------------------------------------+------------+
// |user|recommendations |items |
// +----+-----------------------------------------------------------------+------------+
// |1 |[[2,0.9997897], [4,0.9988761], [1,0.9988487], [5,0.0]] |[2, 4, 1, 5]|
// |3 |[[5,0.9984464], [1,2.9802322E-8], [2,0.0], [4,0.0]] |[5, 1, 2, 4]|
// |2 |[[4,0.9921428], [2,0.10759391], [1,0.10749264], [5,1.4901161E-8]]|[4, 2, 1, 5]|
// +----+-----------------------------------------------------------------+------------+
The GenericRowWithSchema cannot be cast to XXX
issue:
There is a strict mapping between Spark SQL data types and Scala types, such as IntegerType
vs. Int
, TimestampType
vs. java.sql.Timestamp
and StructType
vs. org.apache.spark.sql.Row
.
ref:
https://stackoverflow.com/questions/38413040/spark-sql-udf-with-complex-input-parameter
https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types
Create an User-Defined Function (UDF) with Extra Parameters
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import scala.util.control.Breaks._
val df = spark.createDataFrame(Seq(
("Bob", 35, "Web Backend Developer"),
("Cathy", 24, "UX Designer"),
("Vic", 26, "PM"),
("Tom", 29, "Back end Engineer")
)).toDF("name", "age", "occupation")
// the return type of this UDF is Double
def containsAnyOfUDF(substrings: Array[String]): UserDefinedFunction = udf[Double, String]((text: String) => {
var result = 0.0
breakable {
for (substring <- substrings) {
if (text.contains(substring)) {
result = 1.0
break
}
}
}
result
})
val backends = Array("backend", "back end")
df.withColumn("knows_backend", containsAnyOfUDF(backends)(lower($"occupation"))))
// +-----+---+---------------------+-------------+
// |name |age|occupation |knows_backend|
// +-----+---+---------------------+-------------+
// |Bob |35 |Web Backend Developer|1.0 |
// |Cathy|24 |UX Designer |0.0 |
// |Vic |26 |PM |0.0 |
// |Tom |29 |Back end Engineer |1.0 |
// +-----+---+---------------------+-------------+
ref:
https://stackoverflow.com/questions/35546576/how-can-i-pass-extra-parameters-to-udfs-in-sparksql
Select Rows that Contain Certain String in a DataFrame
userInfoDF
.where("company LIKE '%Inc%'")
.orderBy($"company".desc)
Order By One of the Elements in a List Column
import org.apache.spark.ml.linalg.{Vector, Vectors}
val to_array = udf((v: Vector) => v.toDense.values)
resultTestDF
.where("user_id = 652070")
.orderBy(to_array($"probability").getItem(1).desc)
.select("user_id", "repo_id", "starring", "prediction", "probability")
.show(false)
Show Distinct Values
df.select($"company").distinct().orderBy($"company".desc)
import org.apache.spark.sql.functions.approx_count_distinct
imputedUserInfoDF.select(approx_count_distinct($"company")).show()
// 39581
Group By then Aggregation
import org.apache.spark.sql.functions.count
val df = userInfoDF
.groupBy($"company")
.agg(count("user_id").alias("count"))
.orderBy($"count".desc)
.limit(1000)
Group By then Get First/Top n Items
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rank
val topN = 10
val window = Window.partitionBy($"user_id").orderBy($"starred_at".desc)
val userLanguagesDF = repoInfoStarringDF
.withColumn("rank", rank.over(window))
.where($"rank" <= topN)
.groupBy($"user_id")
.agg(collect_list($"language").alias("languages"))
// +--------+-----------------------------------------------------------------+
// |user_id |languages |
// +--------+-----------------------------------------------------------------+
// |2142 |[JavaScript, Python, Python, Ruby, Go] |
// |59990 |[Python, JavaScript, Go, JavaScript, Go] |
// |101094 |[JavaScript, C++, Max, C, JavaScript] |
// |109622 |[PHP, PHP, PHP, PHP, Python] |
// |201694 |[TypeScript, Python, Vim script, Vim script, Python] |
// +--------+-----------------------------------------------------------------+
ref:
https://stackoverflow.com/questions/33655467/get-topn-of-all-groups-after-group-by-using-spark-dataframe
https://stackoverflow.com/questions/35918262/spark-topn-after-groupby
or
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number
val df = spark.createDataFrame(Seq(
(2, 1, 0, "2017-05-18 22:01:00.0"),
(1, 1, 1, "2017-05-16 20:01:00.0"),
(2, 5, 0, "2017-05-21 22:01:00.0"),
(1, 2, 1, "2017-05-17 21:01:00.0"),
(1, 4, 1, "2017-05-17 22:01:00.0"),
(1, 7, 1, "2017-05-12 22:01:00.0"),
(3, 1, 1, "2017-05-19 22:01:00.0"),
(3, 2, 1, "2017-05-30 22:01:00.0"),
(3, 5, 1, "2017-05-01 22:01:00.0"),
(1, 6, 1, "2017-05-19 22:01:00.0")
)).toDF("user_id", "repo_id", "starring", "starred_at")
val windowSpec = Window.partitionBy($"user_id").orderBy($"starred_at".desc)
val userActualItemsDF = df
.withColumn("row_number", row_number().over(windowSpec))
.where($"row_number" <= 3)
// +-------+-------+--------+---------------------+----------+
// |user_id|repo_id|starring|starred_at |row_number|
// +-------+-------+--------+---------------------+----------+
// |1 |6 |1 |2017-05-19 22:01:00.0|1 |
// |1 |4 |1 |2017-05-17 22:01:00.0|2 |
// |1 |2 |1 |2017-05-17 21:01:00.0|3 |
// |3 |2 |1 |2017-05-30 22:01:00.0|1 |
// |3 |1 |1 |2017-05-19 22:01:00.0|2 |
// |3 |5 |1 |2017-05-01 22:01:00.0|3 |
// |2 |5 |0 |2017-05-21 22:01:00.0|1 |
// |2 |1 |0 |2017-05-18 22:01:00.0|2 |
// +-------+-------+--------+---------------------+----------+
ref:
https://stackoverflow.com/questions/33878370/how-to-select-the-first-row-of-each-group
http://xinhstechblog.blogspot.tw/2016/04/spark-window-functions-for-dataframes.html
Group By then Concatenate Strings
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val userTopicsDF = repoInfoStarringDF
.where($"topics" =!= "")
.withColumn("rank", rank.over(Window.partitionBy($"user_id").orderBy($"starred_at".desc)))
.where($"rank" <= 50)
.groupBy($"user_id")
.agg(concat_ws(",", collect_list($"topics")).alias("topics_preferences"))
// +--------+-------------------------------------------------------------------+
// | user_id| topics_preferences|
// +--------+-------------------------------------------------------------------+
// | 2142|go,golang,grpc,microservice,protobuf,consul,go,golang,load-balancer|
// | 59990|api,api-gateway,aws,aws-infrastructure,aws-lambda,deploy-tool |
// | 101094|ableton,javascript,maxmsp,c,c89,gui,imgui,nuklear |
// | 109622|stripe,algolia,magento,magento-algoliasearch,php,search,aws |
// +--------+-------------------------------------------------------------------+
Group By after Order By Maintains Order
val list = Seq(
(2, 1, 0, "2017-05-18 22:01:00.0"),
(1, 1, 1, "2017-05-16 20:01:00.0"),
(2, 5, 0, "2017-05-21 22:01:00.0"),
(1, 2, 1, "2017-05-17 21:01:00.0"),
(1, 4, 1, "2017-05-17 22:01:00.0"),
(3, 1, 1, "2017-05-19 22:01:00.0"),
(3, 2, 1, "2017-05-30 22:01:00.0"),
(3, 5, 1, "2017-05-01 22:01:00.0"),
(1, 6, 1, "2017-05-19 22:01:00.0")
)
val df = list.toDF("user_id", "repo_id", "starring", "starred_at")
val userActualItemsDF = df
.orderBy($"starred_at".desc)
.groupBy($"user_id")
.agg(collect_list($"repo_id").alias("items"))
// +-------+------------+
// |user_id| items|
// +-------+------------+
// | 1|[6, 4, 2, 1]|
// | 3| [2, 1, 5]|
// | 2| [5, 1]|
// +-------+------------+
collect_list Multiple Columns
import org.apache.spark.sql.functions.{collect_list, struct}
predictionDF
// +----+----+------+----------+
// |user|item|rating|prediction|
// +----+----+------+----------+
// | 1| 1| 12| 0.9988487|
// | 3| 5| 8| 0.9984464|
// | 1| 4| 4|0.99887615|
// | 2| 4| 1| 0.9921428|
// | 1| 2| 90| 0.9997897|
// +----+----+------+----------+
val userRecommendationsDF = predictionDF
.groupBy($"user")
.agg(collect_list(struct($"item", $"prediction")).alias("recommendations"))
// +----+----------------------------------------------+
// |user|recommendations |
// +----+----------------------------------------------+
// |1 |[[1,0.9988487], [4,0.99887615], [2,0.9997897]]|
// |3 |[[5,0.9984464]] |
// |2 |[[4,0.9921428]] |
// +----+----------------------------------------------+
groupByKey() on Dataset
val keyValues = List(
(3, "Me"), (1, "Thi"), (2, "Se"), (3, "ssa"), (1, "sIsA"), (3, "ge:"), (3, "-"), (2, "cre"), (2, "t")
)
val keyValuesDS = keyValues.toDS
val ds = keyValuesDS
.groupByKey(P => P._1)
.mapValues(p => p._2)
.reduceGroups((acc, str) => acc + str)
ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/yrfPh/datasets
case class TimeUsageRow(
working: String,
sex: String,
age: String,
primaryNeeds: Double,
work: Double,
other: Double
)
val summaryDs = doShit()
// +-----------+------+------+------------------+------------------+------------------+
// | working| sex| age| primaryNeeds| work| other|
// +-----------+------+------+------------------+------------------+------------------+
// | working| male| elder| 15.25| 0.0| 8.75|
// | working|female|active|13.833333333333334| 0.0|10.166666666666666|
// | working|female|active|11.916666666666666| 0.0|12.083333333333334|
// |not working|female|active|13.083333333333334| 2.0| 8.916666666666666|
// | working| male|active|11.783333333333333| 8.583333333333334|3.6333333333333333|
// +-----------+------+------+------------------+------------------+------------------+
val finalDs = summaryDs
.groupByKey((row: TimeUsageRow) => {
(row.working, row.sex, row.age)
})
.agg(
round(typed.avg[TimeUsageRow](_.primaryNeeds), 1).as(Encoders.DOUBLE),
round(typed.avg[TimeUsageRow](_.work), 1).as(Encoders.DOUBLE),
round(typed.avg[TimeUsageRow](_.other), 1).as(Encoders.DOUBLE)
)
.map({
case ((working, sex, age), primaryNeeds, work, other) => {
TimeUsageRow(working, sex, age, primaryNeeds, work, other)
}
})
.orderBy("working", "sex", "age")
.as[TimeUsageRow]
// +-----------+------+------+------------+----+-----+
// | working| sex| age|primaryNeeds|work|other|
// +-----------+------+------+------------+----+-----+
// |not working|female|active| 12.4| 0.5| 10.8|
// |not working|female| elder| 10.9| 0.4| 12.4|
// |not working|female| young| 12.5| 0.2| 11.1|
// |not working| male|active| 11.4| 0.9| 11.4|
// |not working| male| elder| 10.7| 0.7| 12.3|
// +-----------+------+------+------------+----+-----+
Aggregator
val myAgg = new Aggregator[IN, BUF, OUT] {
def zero: BUF = ??? // the initial value
def reduce(b: BUF, a: IN): BUF = ??? // add an element to the running total
def merge(b1: BUF, b2: BUF): BUF = ??? // merge itermediate values
def finish(b: BUF): OUT = ??? // return the final result
override def bufferEncoder: Encoder[BUF] = ???
override def outputEncoder: Encoder[OUT] = ???
}.toColumn
ref:
https://www.coursera.org/learn/scala-spark-big-data/lecture/yrfPh/datasets
Join Two DataFrames
val starringDF = spark.createDataFrame(Seq(
(652070, 21289110, 1),
(652070, 4164482, 1),
(652070, 44838949, 0),
(1912583, 4164482, 0)
)).toDF("user_id", "repo_id", "starring")
val repoDF = spark.createDataFrame(Seq(
(21289110, "awesome-python", "Python", 37336),
(4164482, "django", "Python", 27451),
(44838949, "swift", "C++", 39840)
)).toDF("id", "repo_name", "repo_language", "stargazers_count")
val fullDF = starringDF.join(repoDF, starringDF.col("repo_id") === repoDF.col("id"))
// +-------+--------+--------+--------+--------------+-------------+----------------+
// |user_id| repo_id|starring| id| repo_name|repo_language|stargazers_count|
// +-------+--------+--------+--------+--------------+-------------+----------------+
// | 652070|21289110| 1|21289110|awesome-python| Python| 37336|
// | 652070| 4164482| 1| 4164482| django| Python| 27451|
// | 652070|44838949| 0|44838949| swift| C++| 39840|
// |1912583| 4164482| 0| 4164482| django| Python| 27451|
// +-------+--------+--------+--------+--------------+-------------+----------------+
ref:
https://docs.databricks.com/spark/latest/faq/join-two-dataframes-duplicated-column.html