Spark ML cookbook (Scala)

Spark ML cookbook (Scala)

Train an ALS model

import org.apache.spark.ml.recommendation.ALS

val df = spark.createDataFrame(Seq(
  (1, 1, 12),
  (1, 2, 90),
  (1, 4, 4),
  (2, 4, 1),
  (3, 5, 8)
)).toDF("user", "item", "rating")

val als = new ALS()
  .setImplicitPrefs(true)
  .setRank(5)
  .setRegParam(0.5)
  .setAlpha(40)
  .setMaxIter(10)
  .setSeed(42)
  .setColdStartStrategy("drop")
val alsModel = als.fit(df)

val userRecommendationsDF = alsModel.recommendForAllUsers(15)
// +----+-----------------------------------------------------------------+
// |user|recommendations                                                  |
// +----+-----------------------------------------------------------------+
// |1   |[[2,0.9997897], [4,0.9988761], [1,0.9988487], [5,0.0]]           |
// |3   |[[5,0.9984464], [1,2.9802322E-8], [2,0.0], [4,0.0]]              |
// |2   |[[4,0.9921428], [2,0.10759391], [1,0.10749264], [5,1.4901161E-8]]|
// +----+-----------------------------------------------------------------+

userRecommendationsDF.printSchema()
// root
 // |-- user: integer (nullable = false)
 // |-- recommendations: array (nullable = true)
 // |    |-- element: struct (containsNull = true)
 // |    |    |-- item: integer (nullable = true)
// |    |    |-- rating: float (nullable = true)

ref:
https://spark.apache.org/docs/latest/ml-collaborative-filtering.html

Convert a string categorical feature into numeric one

StringIndexer converts labels (categorical values) into numbers (0.0, 1.0, 2.0 and so on) which ordered by label frequencies, the most frequnet label gets 0. This method is able to handle unseen labels with optional strategies.

StringIndexer's inputCol accepts string, numeric and boolean types.

val df1 = spark.createDataFrame(Seq(
    (1, "Python"),
    (2, "C++"),
    (3, "C++"),
    (4, "JavaScript"),
    (5, "Python"),
    (6, "Python"),
    (7, "Go")
)).toDF("repo_id", "repo_language")

val df2 = spark.createDataFrame(Seq(
    (1, "Python"),
    (2, "C++"),
    (3, "C++"),
    (4, "JavaScript"),
    (5, "Python"),
    (6, "Python"),
    (7, "Go"),
    (8, "JavaScript"),
    (9, "Brainfuck"),
    (10, "Brainfuck"),
    (11, "Red")
)).toDF("repo_id", "repo_language")

import org.apache.spark.ml.feature.StringIndexer

val stringIndexer = new StringIndexer()
  .setInputCol("repo_language")
  .setOutputCol("repo_language_index")
  .setHandleInvalid("keep")
val stringIndexerModel = stringIndexer.fit(df1)

stringIndexerModel.labels
// Array[String] = Array(Python, C++, JavaScript, Go)

val indexedDF = stringIndexerModel.transform(df2)
indexedDF.show()
// +-------+-------------+-------------------+
// |repo_id|repo_language|repo_language_index|
// +-------+-------------+-------------------+
// |      1|       Python|                0.0|
// |      2|          C++|                1.0|
// |      3|          C++|                1.0|
// |      4|   JavaScript|                3.0|
// |      5|       Python|                0.0|
// |      6|       Python|                0.0|
// |      7|           Go|                2.0|
// |      8|   JavaScript|                3.0|
// |      9|    Brainfuck|                4.0| <- previously unseen
// |     10|    Brainfuck|                4.0| <- previously unseen
// |     11|          Red|                4.0| <- previously unseen
// +-------+-------------+-------------------+

ref:
https://spark.apache.org/docs/latest/ml-features.html#stringindexer
https://stackoverflow.com/questions/34681534/spark-ml-stringindexer-handling-unseen-labels
https://stackoverflow.com/questions/32277576/how-to-handle-categorical-features-with-spark-ml/32278617

Convert a indexed numeric feature back to the original categorical one

import org.apache.spark.ml.feature.IndexToString

val indexToString = new IndexToString()
  .setInputCol("repo_language_index")
  .setOutputCol("repo_language_ori")

val oriIndexedDF = indexToString.transform(indexedDF)
oriIndexedDF.show()
// +-------+-------------+-------------------+----------------------+
// |repo_id|repo_language|repo_language_index|     repo_language_ori|
// +-------+-------------+-------------------+----------------------+
// |      1|       Python|                0.0|                Python|
// |      2|          C++|                1.0|                   C++|
// |      3|          C++|                1.0|                   C++|
// |      4|   JavaScript|                2.0|            JavaScript|
// |      5|       Python|                0.0|                Python|
// |      6|       Python|                0.0|                Python|
// |      7|           Go|                3.0|                    Go|
// |      8|   JavaScript|                2.0|            JavaScript|
// |      9|    Brainfuck|                4.0|             __unknown| <- previously unseen
// |     10|    Brainfuck|                4.0|             __unknown| <- previously unseen
// |     11|          Red|                4.0|             __unknown| <- previously unseen
// +-------+-------------+-------------------+----------------------+

ref:
https://spark.apache.org/docs/latest/ml-features.html#indextostring

One-hot encoding for categorical features

OneHotEncoder's input column only accepts numeric types. If you have string columns, you need to use StringIndexer to transform them into doubles, Bessides, StringIndexer is able to properly deal with unseen values which. In my humble opinion, you should always apply StringIndexer before OneHotEncoder.

import org.apache.spark.ml.feature.OneHotEncoder

val knownDF = spark.createDataFrame(Seq(
  (2, "b"),
  (3, "c"),
  (0, "x"),
  (6, "c"),
  (4, "a"),
  (1, "a"),
  (5, "a")
)).toDF("category_1", "category_2")

val unseenDF = spark.createDataFrame(Seq(
  (123, "e"),
  (6, "c"),
  (2, "b"),
  (456, "c"),
  (1, "a")
)).toDF("category_1", "category_2")

val knownOneHotDF = new OneHotEncoder()
  .setDropLast(true)
  .setInputCol("category_1")
  .setOutputCol("category_1_one_hot")
  .transform(knownDF)
knownOneHotDF.show()
// +----------+----------+------------------+
// |category_1|category_2|category_1_one_hot|
// +----------+----------+------------------+
// |         2|         b|     (6,[2],[1.0])|
// |         3|         c|     (6,[3],[1.0])|
// |         0|         x|     (6,[0],[1.0])|
// |         6|         c|         (6,[],[])|
// |         4|         a|     (6,[4],[1.0])|
// |         1|         a|     (6,[1],[1.0])|
// |         5|         a|     (6,[5],[1.0])|
// +----------+----------+------------------+

val unseenOneHotDF = new OneHotEncoder()
  .setDropLast(true)
  .setInputCol("category_1")
  .setOutputCol("category_1_one_hot")
  .transform(unseenDF)
unseenOneHotDF.show()
// +----------+----------+------------------+
// |category_1|category_2|category_1_one_hot|
// +----------+----------+------------------+
// |       123|         e| (456,[123],[1.0])|
// |         6|         c|   (456,[6],[1.0])|
// |         2|         b|   (456,[2],[1.0])|
// |       456|         c|       (456,[],[])|
// |         1|         a|   (456,[1],[1.0])|
// +----------+----------+------------------+

ref:
https://spark.apache.org/docs/latest/ml-features.html#onehotencoder
https://stackoverflow.com/questions/33089781/spark-dataframe-handing-empty-string-in-onehotencoder
https://stackoverflow.com/questions/32277576/how-to-handle-categorical-features-with-spark-ml/40615508

Create a regular expression Tokenizer

setGaps(true) 時的 pattern 是 match 分隔符;setGaps(false) 時的 pattern 則是 match 字。

import org.apache.spark.ml.feature.RegexTokenizer
import org.apache.spark.sql.functions._

val sentenceDF = spark.createDataFrame(Seq(
  (1, "Hi, I heard about Spark"),
  (2, "I wish Java could use case classes."),
  (3, "Deep,Learning,models,are,state-of-the-art"),
  (4, "fuck_yeah!!! No.")
)).toDF("id", "sentence")

val countTokensUDF = udf((words: Seq[String]) => words.length)

val regexTokenizer = new RegexTokenizer()
  .setInputCol("sentence")
  .setOutputCol("words")
  .setPattern("[\\w-_]+").setGaps(false)
  // .setPattern("\\W").setGaps(true)
  // .setPattern("[,. ]").setGaps(true)
val tokenizedDF = regexTokenizer.transform(sentenceDF)

val df = tokenizedDF
  .select("sentence", "words")
  .withColumn("count", countTokensUDF($"words"))
// +-----------------------------------------+-----------------------------------------------+-----+
// |sentence                                 |words                                          |count|
// +-----------------------------------------+-----------------------------------------------+-----+
// |Hi, I heard about Spark                  |[hi, i, heard, about, spark]                   |5    |
// |I wish Java could use case classes.      |[i, wish, java, could, use, case, classes]     |7    |
// |Deep,Learning,models,are,state-of-the-art|[deep, learning, models, are, state-of-the-art]|5    |
// |fuck_yeah!!! No.                         |[fuck_yeah, no]                                |2    |
// +-----------------------------------------+-----------------------------------------------+-----+

ref:
https://spark.apache.org/docs/latest/ml-features.html#tokenizer

Train a Word2Vec model

import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.feature.RegexTokenizer
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.sql.functions._

val tempDF = userInfoDF.where("bio != ''")

val regexTokenizer = new RegexTokenizer()
  .setToLowercase(true)
  .setInputCol("bio")
  .setOutputCol("bio_words")
  .setPattern("\\W").setGaps(true)
val tokenizedDF = regexTokenizer.transform(tempDF)

val stopWordsRemover = new StopWordsRemover()
  .setInputCol("bio_words")
  .setOutputCol("bio_filtered_words")
val wordsFilteredDF = stopWordsRemover.transform(tokenizedDF)

val word2Vec = new Word2Vec()
  .setInputCol("bio_filtered_words")
  .setOutputCol("bio_word2vec")
  .setVectorSize(100)
  .setWindowSize(5)
  .setMinCount(0)
val word2VecModel = word2Vec.fit(wordsFilteredDF)
val word2VecDF = word2VecModel.transform(wordsFilteredDF)

word2VecModel.findSynonyms("developer", 5)
// +-----------+------------------+
// |       word|        similarity|
// +-----------+------------------+
// |        dev| 0.881394624710083|
// |development|0.7730562090873718|
// |       oier|0.6866029500961304|
// |  develover|0.6720684766769409|
// |     webdev|0.6582568883895874|
// +-----------+------------------+

ref:
https://spark.apache.org/docs/latest/ml-features.html#word2vec

Train a Logistic Regression model

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.Vectors

val training = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(1.0, 2.5, 0.0, 0.0)),
  (1.0, Vectors.dense(0.1, 9.0, 0.0, 0.0)),
  (1.0, Vectors.dense(0.0, 0.0, 1.0, 0.0)),
  (0.0, Vectors.dense(0.0, 0.0, 2.0, 9.0)),
  (0.0, Vectors.dense(1.0, 0.0, 0.0, 5.0))
)).toDF("label", "features")

val lr = new LogisticRegression()
  .setMaxIter(100)
  .setRegParam(0.0)
  .setElasticNetParam(0.0)
  .setFamily("binomial")
  .setFeaturesCol("features")
  .setLabelCol("label")

lr.explainParams()

val lrModel = lr.fit(training)

println(s"Coefficients: ${lrModel.coefficients}")
// [2.0149015925419,2.694173163503675,9.547978766053463,-5.592221425156231]

println(s"Intercept: ${lrModel.intercept}")
// 8.552229795281482

val result = lrModel.transform(test)

ref:
https://spark.apache.org/docs/latest/ml-classification-regression.html#logistic-regression
https://spark.apache.org/docs/latest/mllib-linear-methods.html#logistic-regression

import org.apache.spark.ml.classification.BinaryLogisticRegressionSummary

val binarySummary = lrModel.summary.asInstanceOf[BinaryLogisticRegressionSummary]
println(s"Area Under ROC: ${binarySummary.areaUnderROC}")

ref:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.classification.BinaryLogisticRegressionTrainingSummary

Evaluate a Binary Classification model

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.linalg.Vectors

val df = spark.createDataFrame(Seq(
  (Vectors.dense(0.0, 2.5), 1.0), // correct
  (Vectors.dense(1.0, 4.1), 1.0), // correct
  (Vectors.dense(9.2, 1.1), 0.0), // correct
  (Vectors.dense(1.0, 0.1), 0.0), // correct
  (Vectors.dense(5.0, 0.5), 1.0)  // incorrect
)).toDF("rawPrediction", "starring")

val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setRawPredictionCol("rawPrediction")
  .setLabelCol("starring")
val metric = evaluator.evaluate(df)
// 0.8333333333333333

ref:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

Cross-validate a Pipeline model

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

val vectorAssembler = new VectorAssembler()
  .setInputCols(Array("feature1", "feature2", "feature3"))
  .setOutputCol("features")

val lr = new LogisticRegression()
  .setFeaturesCol("features")
  .setLabelCol("starring")

val pipeline = new Pipeline()
  .setStages(Array(vectorAssembler, lr))

val paramGrid = new ParamGridBuilder()
  .addGrid(lr.maxIter, Array(20, 100))
  .addGrid(lr.regParam, Array(0.0, 0.5, 1.0, 2.0))
  .addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
  .build()

val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setRawPredictionCol("rawPrediction")
  .setLabelCol("starring")

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEstimatorParamMaps(paramGrid)
  .setEvaluator(evaluator)
  .setNumFolds(3)

val cvModel = cv.fit(trainingDF)

ref:
https://spark.apache.org/docs/latest/ml-tuning.html#cross-validation

Extract best parameters from a Cross-Validator model

import org.apache.spark.ml.PipelineModel
import org.apache.spark.ml.classification.LogisticRegressionModel

val bestPipelineModel = cvModel.bestModel.asInstanceOf[PipelineModel]
val lrModel = bestPipelineModel.stages(0).asInstanceOf[LogisticRegressionModel]
lrModel.extractParamMap()
// or
lrModel.explainParams()

ref:
https://stackoverflow.com/questions/31749593/how-to-extract-best-parameters-from-a-crossvalidatormodel

Create a custom Transformer

package ws.vinta.albedo.utils

import org.apache.spark.sql.types.{DataType, StructType}

object SchemaUtils {
  def checkColumnType(schema: StructType, colName: String, dataType: DataType): Unit = {
    val actualDataType = schema(colName).dataType
    require(actualDataType.equals(dataType), s"Column $colName must be of type $dataType but was actually $actualDataType.")
  }
}
package ws.vinta.albedo.preprocessors

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.{DoubleParam, IntParam, Param, ParamMap}
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable}
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import ws.vinta.albedo.utils.SchemaUtils.checkColumnType

import scala.collection.mutable

class NegativeGenerator(override val uid: String, val bcPopularItems: Broadcast[mutable.LinkedHashSet[Int]])
  extends Transformer with DefaultParamsWritable {

  def this(bcPopularItems: Broadcast[mutable.LinkedHashSet[Int]]) = {
    this(Identifiable.randomUID("negativeGenerator"), bcPopularItems)
  }

  val userCol = new Param[String](this, "userCol", "User id 所在的欄位名稱")

  def getUserCol: String = $(userCol)

  def setUserCol(value: String): this.type = set(userCol, value)
  setDefault(userCol -> "user")

  val itemCol = new Param[String](this, "itemCol", "Item id 所在的欄位名稱")

  def getItemCol: String = $(itemCol)

  def setItemCol(value: String): this.type = set(itemCol, value)
  setDefault(itemCol -> "item")

  val labelCol = new Param[String](this, "labelCol", "Label 所在的欄位名稱")

  def getLabelCol: String = $(labelCol)

  def setLabelCol(value: String): this.type = set(labelCol, value)
  setDefault(labelCol -> "label")

  val negativeValue = new IntParam(this, "negativeValue", "負樣本的值")

  def getNegativeValue: Int = $(negativeValue)

  def setNegativeValue(value: Int): this.type = set(negativeValue, value)
  setDefault(negativeValue -> 0)

  val negativePositiveRatio = new DoubleParam(this, "negativePositiveRatio", "負樣本與正樣本的比例")

  def getNegativePositiveRatio: Double = $(negativePositiveRatio)

  def setNegativePositiveRatio(value: Double): this.type = set(negativePositiveRatio, value)
  setDefault(negativePositiveRatio -> 1.0)

  override def transformSchema(schema: StructType): StructType = {
    checkColumnType(schema, $(userCol), IntegerType)
    checkColumnType(schema, $(itemCol), IntegerType)
    checkColumnType(schema, $(labelCol), IntegerType)

    schema
  }

  override def transform(dataset: Dataset[_]): DataFrame = {
    transformSchema(dataset.schema)

    val popularItems: mutable.LinkedHashSet[Int] = this.bcPopularItems.value

    val emptyItemSet = new mutable.HashSet[Int]
    val addToItemSet = (itemSet: mutable.HashSet[Int], item: Int) => itemSet += item
    val mergeItemSets = (set1: mutable.HashSet[Int], set2: mutable.HashSet[Int]) => set1 ++= set2

    val getUserNegativeItems = (userItemsPair: (Int, mutable.HashSet[Int])) => {
      val (user, positiveItems) = userItemsPair
      val negativeItems = popularItems.diff(positiveItems)
      val requiredNegativeItemsCount = (positiveItems.size * this.getNegativePositiveRatio).toInt
      (user, negativeItems.slice(0, requiredNegativeItemsCount))
    }
    val expandNegativeItems = (userItemsPair: (Int, mutable.LinkedHashSet[Int])) => {
      val (user, negativeItems) = userItemsPair
      negativeItems.map({(user, _, $(negativeValue))})
    }

    import dataset.sparkSession.implicits._

    // TODO: 目前是假設傳進來的 dataset 都是 positive samples,之後可能得處理含有 negative samples 的情況
    val negativeDF = dataset
      .select($(userCol), $(itemCol))
      .rdd
      .map({
        case Row(user: Int, item: Int) => (user, item)
      })
      .aggregateByKey(emptyItemSet)(addToItemSet, mergeItemSets)
      .map(getUserNegativeItems)
      .flatMap(expandNegativeItems)
      .toDF($(userCol), $(itemCol), $(labelCol))

    dataset.select($(userCol), $(itemCol), $(labelCol)).union(negativeDF)
  }

  override def copy(extra: ParamMap): NegativeGenerator = {
    defaultCopy(extra)
  }
}

ref:
https://www.safaribooksonline.com/library/view/high-performance-spark/9781491943199/ch09.html#extending_spark_ml
https://www.youtube.com/watch?v=gCfVVrgWgxY
https://stackoverflow.com/questions/40615713/how-to-write-a-custom-transformer-in-mllib

Create a custom Evaluator

package ws.vinta.albedo.evaluators

import org.apache.spark.ml.evaluation.Evaluator
import org.apache.spark.ml.param.{Param, ParamMap}
import org.apache.spark.ml.util.{DefaultParamsWritable, Identifiable}
import org.apache.spark.mllib.evaluation.RankingMetrics
import org.apache.spark.sql.{DataFrame, Dataset, Row}

class RankingEvaluator(override val uid: String, val userActualItemsDF: DataFrame)
  extends Evaluator with DefaultParamsWritable {

  def this(userActualItemsDF: DataFrame) = {
    this(Identifiable.randomUID("rankingEvaluator"), userActualItemsDF)
  }

  val metricName = new Param[String](this, "metricName", "評估方式")

  def getMetricName: String = $(metricName)

  def setMetricName(value: String): this.type = set(metricName, value)
  setDefault(metricName -> "[email protected]")

  val k = new Param[Int](this, "k", "只評估前 k 個 items 的排序結果")

  def getK: Int = $(k)

  def setK(value: Int): this.type = set(k, value)
  setDefault(k -> 15)

  override def isLargerBetter: Boolean = $(metricName) match {
    case "map" => true
    case "[email protected]" => true
    case "[email protected]" => true
  }

  override def evaluate(dataset: Dataset[_]): Double = {
    import dataset.sparkSession.implicits._

    val userPredictedItemsDF = dataset.select($"user_id", $"recommendations.repo_id".alias("items"))

    val bothItemsRDD = userPredictedItemsDF.join(userActualItemsDF, Seq("user_id", "user_id"))
      .select(userPredictedItemsDF.col("items"), userActualItemsDF.col("items"))
      .rdd
      .map((row: Row) => {
        // Row(userPredictedItems, userActualItems)
        (row(0).asInstanceOf[Seq[Int]].toArray, row(1).asInstanceOf[Seq[Int]].toArray)
      })

    val rankingMetrics = new RankingMetrics(bothItemsRDD)
    val metric = $(metricName) match {
      case "map" => rankingMetrics.meanAveragePrecision
      case "[email protected]" => rankingMetrics.ndcgAt($(k))
      case "[email protected]" => rankingMetrics.precisionAt($(k))
    }
    metric
  }

  override def copy(extra: ParamMap): RankingEvaluator = {
    defaultCopy(extra)
  }
}

ref:
https://spark.apache.org/docs/latest/mllib-evaluation-metrics.html#ranking-systems
https://www.safaribooksonline.com/library/view/spark-the-definitive/9781491912201/ch19.html#s6c5---recommendation

Spark SQL cookbook (Scala)

Spark SQL cookbook (Scala)

Access SparkSession

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

val spark = SparkSession.builder().config(conf).getOrCreate()

val sc = spark.sparkContext
sc.setLogLevel("WARN")

ref:
https://sparkour.urizone.net/recipes/understanding-sparksession/

Import implicits

val spark = SparkSession.builder().getOrCreate()

import spark.implicits._

// you could also access SparkSession via any Dataset or DataFrame
import someDS.sparkSession.implicits._
import someDF.sparkSession.implicits._

val ratingDF = rawDF
  .selectExpr("from_user_id AS user", "repo_id AS item", "1 AS rating", "starred_at")
  .orderBy($"user", $"starred_at".desc)

Read data from MySQL

import java.util.Properties

val dbUrl = "jdbc:mysql://mysql:3306/albedo?user=root&password=123&verifyServerCertificate=false&useSSL=false"
val prop = new Properties()
prop.put("driver", "com.mysql.jdbc.Driver")
val rawDF = spark.read.jdbc(dbUrl, "app_repostarring", prop)

ref:
https://docs.databricks.com/spark/latest/data-sources/sql-databases.html

Read data from a Apache Parquet file

import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}

import spark.implicits._

case class RepoStarring(
  user_id: Int,
  repo_id: Int,
  starred_at: java.sql.Timestamp,
  starring: Double
)

val dataDir = "."
val today = "20170820"

val savePath = s"$dataDir/spark-data/$today/repoStarringDF.parquet"
val df: DataFrame = try {
  spark.read.parquet(savePath)
} catch {
  case e: AnalysisException => {
    if (e.getMessage().contains("Path does not exist")) {
      val df = spark.read.jdbc(dbUrl, "app_repostarring", props)
        .select("user_id", "repo_id", "starred_at")
        .withColumn("starring", lit(1.0))
      df.write.parquet(savePath)
      df
    } else {
      throw e
    }
  }
}
df.as[RepoStarring]

Read data from a Apache Avro file

$ spark-submit --packages "com.databricks:spark-avro_2.11:3.2.0"
import com.databricks.spark.avro._
import org.apache.spark.sql.SparkSession

implicit val spark = SparkSession
  .builder()
  .appName("Word2VecTrainer")
  .getOrCreate()

val df = spark.read.avro("./githubarchive_repo_info.avro")
df.show()

ref:
https://github.com/databricks/spark-avro

Create a DataFrame from a Seq

import java.sql.Timestamp
import org.apache.spark.sql.functions._

import spark.implicits._

val list = Seq(
  (1, 1, 1, "2017-05-16 20:01:00.0"),
  (1, 2, 1, "2017-05-17 21:01:00.0"),
  (2, 1, 0, "2017-05-18 22:01:00.0"),
  (3, 1, 1, "2017-05-10 22:01:00.0")
)
val df = list
  .toDF("user", "item", "star", "starred_at")
  .withColumn("starred_at", unix_timestamp(col("starred_at")).cast("timestamp"))
// you could override the schema
// val df = spark.createDataFrame(df.rdd, starringSchema)

case class Starring(user: Int, item: Int, star: Int, starred_at: Timestamp)
val ds = df.as[Starring]

df.printSchema()
// root
// |-- user: integer (nullable = false)
// |-- item: integer (nullable = false)
// |-- star: integer (nullable = false)
// |-- starred_at: timestamp (nullable = false)

Statistics

repoInfoDS.count()
// 16000

repoInfoDS
  .describe("language", "size", "stargazers_count", "forks_count", "subscribers_count", "open_issues_count")
  .show()
// +-------+--------+-----------------+-----------------+----------------+-----------------+-----------------+
// |summary|language|             size| stargazers_count|     forks_count|subscribers_count|open_issues_count|
// +-------+--------+-----------------+-----------------+----------------+-----------------+-----------------+
// |  count|   16000|            16000|            16000|           16000|            16000|            16000|
// |   mean|    null|    14777.8928125|      238.5754375|       61.164375|        20.802375|         9.896625|
// | stddev|    null|133926.8365289536|1206.220336641683|394.950236056925|84.09955924587845|56.72585435688847|
// |    min|   ANTLR|                0|                2|               0|                0|                0|
// |    max|    XSLT|          9427027|            56966|           28338|             4430|             3503|
// +-------+--------+-----------------+-----------------+----------------+-----------------+-----------------+

repoInfoDS.stat.corr("stargazers_count", "forks_count")
// 0.8408082958003276

repoInfoDS.stat.corr("stargazers_count", "size")
// 0.05351197356230549

repoInfoDS.stat.freqItems(Seq("language"), 0.1).show(truncate=false)
// +-----------------------------------------------------------+
// |language_freqItems                                         |
// +-----------------------------------------------------------+
// |[Ruby, Objective-C, C, Go, JavaScript, Swift, Java, Python]|
// +-----------------------------------------------------------+

ref:
https://databricks.com/blog/2015/06/02/statistical-and-mathematical-functions-with-dataframes-in-spark.html

Generate a schema from a case class

import java.sql.Timestamp
import org.apache.spark.sql.Encoders

case class Starring(user: Int, item: Int, star: Int, starred_at: Timestamp)
val starringSchema = Encoders.product[Starring].schema

ref:
https://stackoverflow.com/questions/36746055/generate-a-spark-structtype-schema-from-a-case-class

Change a DataFrame's schema

val newDF = spark.createDataFrame(oldDF.rdd, starringSchema)

Convert a DataFrame into a Dataset

import java.sql.Timestamp
import org.apache.spark.ml.feature.SQLTransformer

case class Starring(user: Int, item: Int, star: Int, starred_at: Timestamp)

val starringBuilder = new SQLTransformer()
val starringSQL = """
SELECT from_user_id AS user, repo_id AS item, 1 AS star, starred_at
FROM __THIS__
ORDER BY user, starred_at DESC
"""
starringBuilder.setStatement(starringSQL)
val starringDS = starringBuilder.transform(rawDF).as[Starring]

Convert a Dataset into a PairRDD

val pairRDD = dataset
  .select("user", "item")
  .rdd
  .map(row => (row(0), row(1)))

// or

case class UserItemPair(user: Int, item: Int)

val pairRDD = dataset
  .select("user", "item").as[UserItemPair]
  .rdd
  .map(row => (row.user, row.item))

// or

import org.apache.spark.sql.Row

val pairRDD = dataset
  .select("user", "item")
  .rdd
  .map({
    case Row(user: Int, item: Int) => (user, item)
  })

ref:
https://stackoverflow.com/questions/30655914/map-rdd-to-pairrdd-in-scala

Convert a Dataset into a LinkedHashSet

import scala.collection.mutable

val popularItems: mutable.LinkedHashSet[Int] = dataset
  .select("item")
  .rdd
  .map(r => r(0).asInstanceOf[Int])
  .collect()
  .to[mutable.LinkedHashSet]

println(popularItems.mkString(", "))

ref:
https://stackoverflow.com/questions/32000646/extract-column-values-of-dataframe-as-list-in-apache-spark

Add a column to a DataFrame

import org.apache.spark.sql.functions.lit

val newDF = df.withColumn("starring", lit(1))

Combine two columns into an Array

import org.apache.spark.sql.functions._

fixRepoInfoDS
  .where("topics != ''")
  .withColumn("tags", struct("language", "topics"))
  .select("tags")
  .show(truncate=false)
// +----------------------------------------------------------------------+
// |tags                                                                  |
// +----------------------------------------------------------------------+
// |[Ruby,aasm,activerecord,mongoid,ruby,state-machine,transition]        |
// |[Ruby,captcha,rails,recaptcha,ruby,sinatra]                           |
// |[Python,commandline,gtd,python,sqlite,todo]                           |
// ...
// +----------------------------------------------------------------------+

Select nested column directly

userRecommendationsDF
+----+-----------------------------------------------------------------+
|user|recommendations                                                  |
+----+-----------------------------------------------------------------+
|1   |[[2,0.9997897], [4,0.9988761], [1,0.9988487], [5,0.0]]           |
|3   |[[5,0.9984464], [1,2.9802322E-8], [2,0.0], [4,0.0]]              |
|2   |[[4,0.9921428], [2,0.10759391], [1,0.10749264], [5,1.4901161E-8]]|
+----+-----------------------------------------------------------------+

userRecommendationsDF.printSchema()
// root
// |-- user: integer (nullable = false)
// |-- recommendations: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- item: integer (nullable = true)
// |    |    |-- rating: float (nullable = true)

val userItemsDF = userRecommendationsDF.select($"user", $"recommendations.item")
// +----+------------+
// |user|        item|
// +----+------------+
// |   1|[2, 4, 1, 5]|
// |   3|[5, 1, 2, 4]|
// |   2|[4, 2, 1, 5]|
// +----+------------+

ref:
https://stackoverflow.com/questions/38413040/spark-sql-udf-with-complex-input-parameter

Combine two Arrays from two DataFrames

```scala

## Handle missing values (NaN and Null)

`NaN` stands for "Not a Number", it's usually the result of a mathematical operation that doesn't make sense, e.g. 5/0.

```scala
val df1 = repoInfoDF.na.fill("")
val df2 = repoInfoDF.na.fill("", Seq("description", "homepage"))

ref:
https://stackoverflow.com/questions/33089781/spark-dataframe-handing-empty-string-in-onehotencoder
https://stackoverflow.com/questions/43882699/which-differences-there-are-between-null-and-nan-in-spark-how-to-deal-with

Create a User-Defined Function (UDF) which accepts one column

import org.apache.spark.sql.functions.udf

val df = Seq(
  (1, "Company Name Inc."),
  (2, "Company Name Co."), 
  (3, "Company Name ")
).toDF("id", "company")

val removeUninformativeWords = (text: String) => {
  text
    .toLowerCase()
    .replaceAll(", inc.", "")
    .replaceAll(" inc.", "")
    .replaceAll("-inc", "")
    .replaceAll(" co., ltd.", "")
    .replaceAll(" co.", "")
    .replaceAll(" ltd.", "")
    .replaceAll(".com", "")
    .replaceAll("@", "")
    .trim()
}
val removeUninformativeWordsUDF = udf(removeUninformativeWords)

val newDF = df.withColumn("fix_company", removeUninformativeWordsUDF($"company"))
newDF.show()

ref:
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-udfs.html

Create a User-Defined Function (UDF) which accepts multiple columns

When defining your UDFs, you must use Row for mapping StructType instead of any custom case classes.

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row

userRecommendationsDF.printSchema()
// root
 // |-- user: integer (nullable = false)
 // |-- recommendations: array (nullable = true)
 // |    |-- element: struct (containsNull = true)
 // |    |    |-- item: integer (nullable = true)
// |    |    |-- rating: float (nullable = true)

val extractItemsUDF = udf((user: Int, recommendations: Seq[Row]) => {
  val items: Seq[Int] = recommendations.map(_.getInt(0))
  val ratings: Seq[Float] = recommendations.map(_.getFloat(1))
  items
})
userRecommendationsDF.withColumn("items", extractItemsUDF($"user", $"recommendations"))
// +----+-----------------------------------------------------------------+------------+
// |user|recommendations                                                  |items       |
// +----+-----------------------------------------------------------------+------------+
// |1   |[[2,0.9997897], [4,0.9988761], [1,0.9988487], [5,0.0]]           |[2, 4, 1, 5]|
// |3   |[[5,0.9984464], [1,2.9802322E-8], [2,0.0], [4,0.0]]              |[5, 1, 2, 4]|
// |2   |[[4,0.9921428], [2,0.10759391], [1,0.10749264], [5,1.4901161E-8]]|[4, 2, 1, 5]|
// +----+-----------------------------------------------------------------+------------+

ref:
https://stackoverflow.com/questions/32196207/derive-multiple-columns-from-a-single-column-in-a-spark-dataframe

The GenericRowWithSchema cannot be cast to XXX issue:

There is a strict mapping between Spark SQL data types and Scala types, such as IntegerType vs. Int, TimestampType vs. java.sql.Timestamp and StructType vs. org.apache.spark.sql.Row.

ref:
https://stackoverflow.com/questions/38413040/spark-sql-udf-with-complex-input-parameter
https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types

Query

userInfoDF
  .where("company LIKE '%Inc%'")
  .orderBy($"company".desc)

Group by

import org.apache.spark.sql.functions.count

val df = userInfoDF
  .groupBy($"company")
  .agg(count("user_id").alias("count"))
  .orderBy($"count".desc)
  .limit(1000)

Join two DataFrames

val starringDF = spark.createDataFrame(Seq(
    (652070, 21289110, 1),
    (652070, 4164482, 1),
    (652070, 44838949, 0),
    (1912583, 4164482, 0)
)).toDF("user_id", "repo_id", "starring")

val repoDF = spark.createDataFrame(Seq(
    (21289110, "awesome-python", "Python", 37336),
    (4164482, "django", "Python", 27451),
    (44838949, "swift", "C++", 39840)
)).toDF("id", "repo_name", "repo_language", "stargazers_count")

val fullDF = starringDF.join(repoDF, starringDF.col("repo_id") === repoDF.col("id"))
// +-------+--------+--------+--------+--------------+-------------+----------------+
// |user_id| repo_id|starring|      id|     repo_name|repo_language|stargazers_count|
// +-------+--------+--------+--------+--------------+-------------+----------------+
// | 652070|21289110|       1|21289110|awesome-python|       Python|           37336|
// | 652070| 4164482|       1| 4164482|        django|       Python|           27451|
// | 652070|44838949|       0|44838949|         swift|          C++|           39840|
// |1912583| 4164482|       0| 4164482|        django|       Python|           27451|
// +-------+--------+--------+--------+--------------+-------------+----------------+

ref:
https://docs.databricks.com/spark/latest/faq/join-two-dataframes-duplicated-column.html

Spark notebook on Databricks

Use SQL to query a DataFrame

repoInfoDS.createOrReplaceTempView("repoinfo")
%sql CACHE TABLE repoinfo;
%sql SELECT language, count(id) AS total FROM repoinfo GROUP BY language ORDER BY total DESC;

ref:
https://docs.databricks.com/spark/latest/spark-sql/index.html

Mount a Amazon S3 bucket

val accessKey = "XXX"
val secretKey = "XXX".replace("/", "%2F")
val s3BucketName = "files.albedo.one"
val mountPath = "/mnt/albedo_s3/"

dbutils.fs.mount(s"s3a://$accessKey:[email protected]$s3BucketName", mountPath)

ref:
https://docs.databricks.com/spark/latest/data-sources/amazon-s3.html

Run shell commands

%sh ls
%sh cd /mnt/ && ls

ref:
https://docs.databricks.com/user-guide/notebooks/index.html

Feature Engineering 特徵工程中常見的方法

Feature Engineering 特徵工程中常見的方法

Feature Engineering 是個手藝活,講求的是創造力。

本文不定期更新中。

Missing Value Imputation

最簡單暴力的做法當然就是直接 drop 掉那些含有缺失值的 rows。

針對 numerical 特徵的缺失值,可以用以下方式取代:

  • 0: 缺點是可能會混淆其他本來就是 0 的數值
  • -999: 用某個正常情況下不會出現的數值代替,但是選得不好可能會變成異常值,要特別對待
  • Mean: 平均數
  • Median: 中位數,跟平均數相比,不會被異常值干擾

針對 categorical 特徵的缺失值,可以用以下方式取代:

  • Mode: 眾數,最常見的值
  • 改成 "Others" 之類的值

假設你要填補 age 這個特徵,然後你有其他例如 gender 這樣的特徵,你可以分別計算男性和女性的 age 的 mean、median 和 mode 來填補缺失值;更複雜一點的方式是,你可以把沒有缺失值的數據挑出來,用它們來訓練一個 regression 或 classification 模型,用這個模型來預測缺失值。

不過其實有些演算法是可以容許缺失值的,這時候或許可以新增一個 has_missing_value 欄位(稱為 NA indicator column)。

ref:
http://adataanalyst.com/machine-learning/comprehensive-guide-feature-engineering/
https://stats.stackexchange.com/questions/28860/why-adding-an-na-indicator-column-instead-of-value-imputation-for-randomforest

Outliers Detection

發現離群值最直觀的方式就是畫圖表,針對單一特徵可以使用 box plot;兩兩特徵則可以使用 scatter plot。

處置離群值的方式通常是直接刪除或是做變換(例如 log transformation 或 binning),當然你也可以套用處理缺失值的方式。

ref:
https://www.analyticsvidhya.com/blog/2016/01/guide-data-exploration/
https://www.douban.com/note/413022836/

Duplicate Entries Removal

duplicate 或 redundant 尤其指的是那些 features 都一樣,但是 target variable 卻不同的數據。

Feature Scaling 特徵縮放

Standardization 標準化

原始資料中,因為各個特徵的含義和單位不同,每個特徵的取值範圍可能會差異很大。例如某個二元特徵的範圍是 0 或 1,另一個價格特徵的範圍可能是 [0, 1000000],由於取值範圍相差過大導致了模型可能會更偏向於取值範圍較大的那個特徵。解決的辦法就是把各種不同 scale 的特徵轉換成同樣的 scale,稱為標準化或正規化。

狹義來說,標準化專門指的是透過計算 z-score,讓數據的 mean 為 0、 variance 為 1。

ref:
https://spark.apache.org/docs/latest/ml-features.html#standardscaler
http://scikit-learn.org/stable/modules/preprocessing.html#standardization-or-mean-removal-and-variance-scaling

Normalization 歸一化、正規化

歸一化是指把每個樣本縮放到單位範數(每個樣本的範數為 1),適用於計算 dot product 或者兩個樣本之間的相似性。除了標準化、歸一化之外,其他還有透過最大、最小值,把數據的範圍縮放到 [0, 1] 或 [-1, 1] 的區間縮放法,不過這個方法容易受異常值的影響。

標準化是分別對單一特徵進行(針對 column);歸一化是對每個 observation 進行(針對 row)。

對 SVM、logistic regression 或其他使用 squared loss function 的演算法來說,需要 standardization;對 Vector Space Model 來說,需要 normalization;至於 tree-based 的演算法,基本上都不需要標準化或歸一化,它們對 scale 不敏感。

ref:
https://spark.apache.org/docs/latest/ml-features.html#normalizer
http://scikit-learn.org/stable/modules/preprocessing.html#normalization
https://www.qcloud.com/community/article/689521

Feature Transformation 特徵變換

以下適用 continuous 特徵:

Rounding

某些精度有到小數點後第 n 位的特徵,如果你其實不需要那麼精確,可以考慮 round(feature * m) 這樣的做法,甚至可以把 round 之後的數值變成 categorical 特徵。

confidence  round(confidence * 10)
0.9594      10
0.1254      1
0.1854      2
0.5454      5
0.3655      4

Log Transformation

因為 x 越大,log(x) 增長的速度就越慢,所以取 log 的意義是可以 compress 大數和 expand 小數,換句話說就是壓縮 "long tail" 和展開 "head"。假設 x 原本的範圍是 [100, 1000],log(x, 10) 之後的範圍就變成 [2, 3] 了。也常常使用 log(1 + x)log(x / (1 - x))

另外一種類似的做法是 square root 平方根或 cube root 立方根(可以用在負數)。

ref:
https://www.safaribooksonline.com/library/view/mastering-feature-engineering/9781491953235/ch02.html

Binarization 二值化

對數值型的數據設定一個 threshold,大於就賦值為 1、小於就賦值為 0。例如 score,如果你只關心「及格」或「不及格」,可以直接把成績對應到 1(score >= 60)和 0(score < 60)。或是你要做啤酒銷量分析,你可以新增一個 age >= 18 的特徵來標示出已成年。

你有一個 color 的 categorical 特徵,如果你不在乎實際上是什麼顏色的話,其實也可以改成 has_color

ref:
https://spark.apache.org/docs/latest/ml-features.html#binarizer

Binning

也稱為 bucketization。

age 這樣的特徵為例,你可以把所有年齡拆分成 n 段,0-20 歲、20-40 歲、40-60 歲等或是 0-18 歲、18-40 歲、40-70 歲等(等距或等量),然後把個別的年齡對應到某一段,假設 26 歲是對應到第二個 bucket,那新特徵的值就是 2。這種方式是人為地指定每個 bucket 的邊界值,還有另外一種拆分法是根據數據的分佈來拆,稱為 quantization 或 quantile binning,你只需要指定 bucket 的數量即可。

同樣的概念應用到其他地方,可以把 datetime 特徵拆分成上午、中午、下午和晚上;如果是 categorical 特徵,則可以先 SELECT count() ... GROUP BY,然後把出現次數小於某個 threshold 的值改成 "Other" 之類的。或者是你有一個 occupation 特徵,如果你其實不需要非常準確的職業資訊的話,可以把 "Web Developer"、"iOS Developer" 或 "DBA" 這些個別的資料都改成 "Software Engineer"。

binarization 和 binning 都是對 continuous 特徵做 discretization 離散化,增強模型的非線性泛化能力。

ref:
https://spark.apache.org/docs/latest/ml-features.html#bucketizer
https://spark.apache.org/docs/latest/ml-features.html#quantilediscretizer
https://github.com/collectivemedia/spark-ext#optimal-binning
https://www.qcloud.com/community/article/689521

以下適用 categorical 特徵:

ref:
https://en.wikipedia.org/wiki/Categorical_variable

Integer Encoding

通常是直接把字串轉換成數字,一種做法是隨機對應到 0, 1, 2, 3, 4 之類的數值;另外一種做法是依照該值出現的頻率大小的順序來給值,例如最常出現的值給 0,依序給 1, 2, 3 等等。如果是針對一些在某種程度上有次序的 categorical 特徵(稱為 ordinal),例如「鑽石會員」「白金會員」「黃金會員」「普通會員」,直接 mapping 成數字可能沒什麼問題,但是如果是類似 colorcity 這樣的沒有明顯大小的特徵的話,還是用 one-hot encoding 比較合適。不過如果用的是 tree-based 的演算法就無所謂了。

有些 categorical 特徵也可能會用數字表示(例如 id),跟 continuous 特徵的差別是,數值的差異或大小對 categorical 特徵來說沒有太大的意義。

ref:
http://breezedeus.github.io/2014/11/15/breezedeus-feature-processing.html
http://phunters.lofter.com/post/86d56_194e956

One-hot Encoding (OHE)

如果某個特徵有 m 種值(例如 Taipei, Beijing, Tokyo),那它 one-hot encode 之後就會變成長度為 m 的向量:

city    city_Taipei city_Beijing city_tokyo
Taipei  1           0            0
Beijing 0           1            0
Tokyo   0           0            1

你也可以改用 Dummy coding,這樣就只需要產生長度為 m -1 的向量:

city    city_Taipei city_Beijing
Taipei  1           0
Beijing 0           1
Tokyo   0           0

ref:
http://scikit-learn.org/stable/modules/preprocessing.html#preprocessing-categorical-features

Count Vectorization

除了可以用在 text 特徵之外,如果你有 comma-seperated 的 categorical 特徵也可以使用這個方法。例如 genre,裡頭的值長這樣 Action,Sci-Fi,Drama,就可以先用 RegexTokenizer 轉成 Array("action", "sci-fi", "drama"),再用 CountVectorizer 轉成 vector。

ref:
https://spark.apache.org/docs/latest/ml-features.html#countvectorizer

Feature Hashing

以 user id 為例,透過一個 hash function 把每一個 user id 映射到 (hashed1_, hashed_2, ..., hashed_m) 的某個值。因為 m << user id 的取值範圍,所以缺點是會有 collision(如果你的 model 足夠 robust,倒也是可以不管),優點是可以良好地處理之前沒見過的值和罕見的值。

import hashlib

def hash_func(s, n_bins=100000):
    s = s.encode('utf-8')
    return int(hashlib.md5(s).hexdigest(), 16) % (n_bins - 1) + 1

print(hash_func('some categorical value'))

ref:
https://www.slideshare.net/gabrielspmoreira/feature-engineering-getting-most-out-of-data-for-predictive-models-tdc-2017/42

Bin-counting

例如在 Computational Advertising 中,如果你有針對每個 user 的「廣告曝光數(包含點擊和未點擊)」和「廣告點擊數」,你就可以算出每個 user 的「點擊率」,然後用這個機率來表示每個 user,反之也可以對 ad id 使用類似的做法。

ad_id   ad_views  ad_clicks  ad_ctr
412533  18339     1355       0.074
423334  335       12         0.036
345664  1244      132        0.106
349833  35387     1244       0.035

ref:
https://blogs.technet.microsoft.com/machinelearning/2015/02/17/big-learning-made-easy-with-counts/

換個思路,如果你有一個 brand 的特徵,然後你可以從 user 的購買記錄中找出購買 A 品牌的人,有 70% 的人會購買 B 品牌、有 40% 的人會購買 C 品牌;購買 D 品牌的人,有 10% 的人會購買 A 品牌和 E 品牌,你可以每個品牌表示成這樣:

brand  A    B    C    D    E
A      1.0  0.7  0.4  0.0  0.0
B      ...
C      ...
D      0.1  0.0  0.0  0.0  0.1
E      ...

ref:
http://phunters.lofter.com/post/86d56_194e956

LabelCount Encoding

類似 Bin-cunting 的做法,一樣是利用現有的 count 或其他統計上的資料,差別在於 LabelCount Encoding 最後用的是次序而不是數值本身。優點是對異常值不敏感。

ad_id   ad_clicks  ad_rank
412533  1355       1
423334  12         4
345664  132        3
349833  1244       2

ref:
https://www.slideshare.net/gabrielspmoreira/feature-engineering-getting-most-out-of-data-for-predictive-models-tdc-2017/47

Mean Encoding

ref:
https://zhuanlan.zhihu.com/p/26308272

Category Embedding

ref:
https://arxiv.org/abs/1604.06737

User Profile 用戶畫像

使用用戶畫像來表示每個 user id,例如用戶的年齡、性別、職業、收入、居住地、偏好的各種 tag 等,把每個 user 表示成一個 feature vector。除了單一維度的特徵之外,也可以建立「用戶聽過的歌都是哪些曲風」、「用戶(30 天內)瀏覽過的文章都是什麼分類,以 TF-IDF 的方式表達。或者是把用戶所有喜歡文章對應的向量的平均值作為此用戶的 profile。比如某個用戶經常關注與推薦系統有關的文章,那麼他的 profile 中 "CB"、"CF" 和 "推薦系統" 對應的權重值就會較高。

ref:
https://mp.weixin.qq.com/s/w87-dyG9Ap9xJ_HZu0Qn-w
https://medium.com/unstructured/how-feature-engineering-can-help-you-do-well-in-a-kaggle-competition-part-i-9cc9a883514d

Rare Categorical Variables

先計算好每一種 category 的數量,然後把小於某個 threshold 的 category 都改成 "Others" 之類的值。或是使用 clustering 演算法來達到同樣的目的。你也可以直接建立一個新的 binary feature 叫做 rare,要來標示那些相對少見的資料點。

Unseen Categorical Variables

以 Spark ML 為例,當你用 training set 的資料 fit 了一個 StringIndexer(和 OneHotEncoder),把它拿去用在 test set 上時,有一定的機率你會遇到某些 categorical 特徵的值只在 test set 出現,所以對只見過 training set 的 transformer 來說,這些就是所謂的 unseen values。

對付 unseen values 通常有幾種做法:

  • 用整個 training set + test set 來編碼 categorical 特徵
  • 直接捨棄含有 unseen values 的那筆資料
  • 把 unseen values 改成 "Others" 之類的已知值。StringIndexer.setHandleInvalid("keep") 基本上就是這種做法

如果採用第一種方式,一但你把這個 transformer 拿到 production 去用時,無可避免地還是會遇到 unseen values。不過通常線上的 feature engineering 會有別的做法,例如事先把 user 或 item 的各項特徵都算好(定期更新或是 data 產生的時候觸發),然後以 id 為 key 存進 Redis 之類的 NoSQL 裡,model 要用的時候直接用 user id / item id 拿到處理好的 feature vector。

ref:
https://stackoverflow.com/questions/34681534/spark-ml-stringindexer-handling-unseen-labels

Large Categorical Variables

針對那種非常大的 categorical 特徵(例如 id 類的特徵),如果你用的是 logistic regression,其實可以硬上 one-hot encoding。不然就是利用上面提到的 feature hashing 或 bin counting 等方式;如果是 GBDT 的話,甚至可以直接用 id 硬上,只要 tree 足夠多。

ref:
https://www.zhihu.com/question/34819617

Feature Construction 特徵建構

特徵構建指的是從原有的特徵中,人工地創造出新的特徵,通常用來解決一般的線性模型沒辦法學到非線性特徵的問題。

如果你有很多 user 購物的資料,除了可以 aggregate 得到 total spend 這樣的 feature 之外,也可以變換一下,變成 spend in last weekspend in last monthspend in last year 這種可以表示「趨勢」的特徵。

範例:

  • author_avg_page_view: 該文章作者的所有文章的平均瀏覽數
  • user_visited_days_since_doc_published: 該文章發布到該用戶訪問經過了多少天
  • user_history_doc_sim_categories: 用戶讀過的所有文章的分類和該篇文章的分類的 TF-IDF 相似度
  • user_history_doc_sim_topics: 用戶讀過的所有文章的內文和該篇文章的內文的 TF-IDF 相似度

ref:
https://medium.com/unstructured/how-feature-engineering-can-help-you-do-well-in-a-kaggle-competition-part-i-9cc9a883514d
https://www.safaribooksonline.com/library/view/large-scale-machine/9781785888748/ch04s02.html

Datetime Features 時序特徵

對於 date / time 類型的資料,除了轉換成 timestamp 和取出 day、month 和 year 做成新的欄位之外,也可以對 hour 做 binning(分成上午、中午、晚上之類的)或是對 day 做 binning(分成工作日、週末);或是想辦法查出該日期當天的天氣、節日等訊息。

Spatial Features 地理特徵

如果你有 cityaddress 等特徵,就可以新建出 latitudelongitude 兩個 features,再組合出 median_income_within_2_miles 這樣的特徵。

Features Interaction 特徵交互

假設你有 AB 兩個 continuous 特徵,你可以用 A + BA - BA * BA / B 之類的方式建立新的特徵。例如 house_age_at_purchase = house_built_date - house_purchase_date 或是 click_through_rate = n_clicks / n_impressions

還有一種類似的作法叫 Polynomial Expansion 多項式展開,當 degree 為 2 時,可以把 (x, y) 兩個特徵變成 (x, x * x, y, x * y, y * y) 五個特徵。

ref:
https://spark.apache.org/docs/latest/ml-features.html#polynomialexpansion
https://elitedatascience.com/feature-engineering-best-practices

Feature Combination 特徵組合

也稱為特徵交叉。

特徵組合主要是針對 categorical 特徵,特徵交互則是適用於 continuous 特徵。但是兩者的概念是差不多的,就是把兩個以上的特徵透過某種方式結合在一起,變成新的特徵。通常用來解決一般的線性模型沒辦法學到非線性特徵的問題。

假設有 genderwealth 兩個特徵,分別有 2 和 3 種取值,最簡單的方式就是直接 string concatenation 組合出一個新的特徵 gender_wealth,共有 2 x 3 = 6 種取值。因為是 categorical 特徵,可以直接對 gender_wealth 使用 StringIndexerOneHotEncoder。你當然也可以一起組合 continuous 和 categorical 特徵,例如 age_wealth 這樣的特徵,只是 vector 裡的值就不是 0 1 而是 age 本身了。

假設 C 是 categorical 特徵,N 是 continuous 特徵,以下有幾種有意義的組合:

  • median(N) GROUP BY C 中位數
  • mean(N) GROUP BY C 算術平均數
  • mode(N) GROUP BY C 眾數
  • min(N) GROUP BY C 最小值
  • max(N) GROUP BY C 最大值
  • std(N) GROUP BY C 標準差
  • var(N) GROUP BY C 方差
  • N - median(N) GROUP BY C
user_id  age  gender  wealth  gender_wealth  gender_wealth_ohe   age_wealth
1        56   male    rich    male_rich      [1, 0, 0, 0, 0, 0]  [56, 0, 0]
2        30   male    middle  male_middle    [0, 1, 0, 0, 0, 0]  [0, 30, 0]
3        19   female  rich    female_rich    [0, 0, 0, 1, 0, 0]  [19, 0, 0]
4        62   female  poor    female_poor    [0, 0, 0, 0, 0, 1]  [0, 0, 62]
5        78   male    poor    male_poor      [0, 0, 1, 0, 0, 0]  [0, 0, 78]
6        34   female  middle  female_middle  [0, 0, 0, 0, 1, 0]  [0, 34, 0]

ref:
http://breezedeus.github.io/2014/11/15/breezedeus-feature-processing.html
http://phunters.lofter.com/post/86d56_194e956
https://zhuanlan.zhihu.com/p/26444240
http://blog.csdn.net/mytestmy/article/details/40933235

Feature Extraction 特徵提取

  • Principal Component Analysis (PCA)
  • Latent Dirichlet Allocation (LDA)

Feature Selection 特徵選擇

特徵選擇是指透過某些方法自動地從所有的特徵中挑選出有用的特徵。

ref:
http://scikit-learn.org/stable/modules/feature_selection.html

Filter Method

採用某一種評估指標(發散性、相關性或 Information Gain 等),單獨地衡量個別特徵跟 target variable 之間的關係,常用的方法有 Chi Square Test(卡方檢驗)。這種特徵選擇方式沒有任何模型的參與。

ref:
https://spark.apache.org/docs/latest/ml-features.html#chisqselector
http://files.cnblogs.com/files/XBWer/%E6%9C%BA%E5%99%A8%E5%AD%A6%E4%B9%A0%E3%81%AE%E7%89%B9%E5%BE%81.pdf

Wrapper Method

會採用某個模型來預測你的 target variable,把特徵選擇想成是一個組合優化的問題,想辦法找出一組特徵子集能夠讓模型的評估結果最好。缺點是太耗時間了,實務上不常用。

ref:
http://www.cnblogs.com/heaad/archive/2011/01/02/1924088.html

Embedded Method

通常會採用一個會為特徵賦予 coefficients 或 importances 的演算法,例如 Logistic Regression(特別是使用 L1 penalty)或 GBDT,直接用權重或重要性對所有特徵排序,然後取前 n 個作為特徵子集。

ref:
http://scikit-learn.org/stable/modules/feature_selection.html#feature-selection-using-selectfrommodel
https://www.zhihu.com/question/28641663

Feature Learning 特徵學習

也稱為 Representation Learning 或 Automated Feature Engineering。

  • GBDT
  • Neural Network: Restricted Boltzmann Machines
  • Deep Learning: Autoencoder

ref:
https://zhuanlan.zhihu.com/p/26444240