XGBoost4j-Spark实战

Posted by Schoko on March 28, 2020

前言

最近做项目,首次使用了XGBoost4j-Spark,踩了不少坑,日常就是查Spark、XGBoost、Scala的API文档…… 所以简单总结一下遇到的问题和解决方法。

需要预备的知识

Scala:

https://www.scala-lang.org

Spark:

https://spark.apache.org/docs/2.3.0/

XGBoost4j-Spark:

https://xgboost.readthedocs.io/en/release_0.82/jvm/xgboost4j_spark_tutorial.html

本文代码版本:

  • 工程: Maven

  • Scala: 2.11.8

  • Spark: 2.3.1

  • XGBoost4j-Spark: 0.82

那咱们开始吧————

配置依赖

<dependencies>
        <!-- Spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>
        <!-- SparkSQL -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>
        <!-- SparkSQL  ON  Hive-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>
        <!-- Scala 包-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.7</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.7</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-reflect</artifactId>
            <version>2.11.7</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib-local -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>  <!--spark-mllib-local_2.11-->
            <version>2.3.1</version>
        </dependency>
        <dependency>
            <groupId>ml.dmlc</groupId>
            <artifactId>xgboost4j</artifactId>
            <version>0.82</version>
        </dependency>
        <dependency>
            <groupId>ml.dmlc</groupId>
            <artifactId>xgboost4j-spark</artifactId>
            <version>0.82</version>
        </dependency>

</dependencies>

数据读取、预处理

数据读取

把打好的jar包放到接入Spark集群的机器上就可以轻松运行,所以从Hive里读取模型的训练数据也很容易:

val spark = SparkSession.builder().enableHiveSupport().appName("CtrXgboost").getOrCreate()
val df = spark.table(inputTable).select("*").orderBy(rand(2020))  // limit(1000)

df的数据类型为Spark DataFrame,orderBy(rand(seed))是对DataFrame进行Row shuffle。

数据预处理

获取sparseFeatures和desneFeatures列:

def getEncCols(inputDF: DataFrame): (Array[String], Array[String]) = {
    val textCols = new ArrayBuffer[String]()
    val res = new ArrayBuffer[String]()

    for (col <- inputDF.columns) {
        if (col.contains("rightname") || col.contains("producttitle")) {
            textCols += col
        } else {
            if (col != "label") {res += col}
        }
    }

    val sparseFeatures = new ArrayBuffer[String]()
    val denseFeatures = new ArrayBuffer[String]()

    for (col <- res) {
        if (inputDF.schema(col).dataType.isInstanceOf[StringType]) {
            sparseFeatures += col
        } else {
            denseFeatures += col
        }
    }
    (sparseFeatures.toArray, denseFeatures.toArray)
}

填补缺失值:

def fillNan(inputDF: DataFrame, categoricalColumns: Array[String], numericalColumns: Array[String]): DataFrame = {
    val fillCate = inputDF.na.replace(categoricalColumns, Map("" -> "-1"))
    val fillNum = fillCate.na.fill(0, numericalColumns)
    fillNum
}

对于categoricalColumns,一开始也是用的na.fill方法,但不知道为什么没有用。 猜测原因可能是原本数据中的空值是Spark不支持的类型(因为数据处理是用python做的)。

特征编码

对类别型特征,采用了One-Hot Encoding进行编码。 一开始我用了Label Encoding,但思考了下,Label Encoding会依赖于训练数据中各项出现的频次, 而训练数据每天都有新增,可能导致统计结果发生变化,而unique value相对来说变化稍小。 所以在unique value不超过一定范围的前提下,还是使用One-Hot Encoding。

首先构建Pipeline:

import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}

def buildStages(categoricalColumns: Array[String], numericalColumns: Array[String]): Array[PipelineStage]= {

    val stagesArray = new ListBuffer[PipelineStage]()

    for (cate <- categoricalColumns) {
        println("-------------------------")
        println(cate)
        val indexer = new StringIndexer()
                .setInputCol(cate)
                .setOutputCol(s"${cate}Index")
                .setHandleInvalid("keep")
        val encoder = new OneHotEncoder()
                .setInputCol(s"${cate}Index")
                .setOutputCol(s"${cate}classVec")

        stagesArray.append(indexer, encoder)
    }

    val assemblerInputs = categoricalColumns.map(_ + "classVec") ++ numericalColumns
    val assembler = new VectorAssembler().setInputCols(assemblerInputs).setOutputCol("features")
    stagesArray.append(assembler)

    stagesArray.toArray
}

进行fit与transform,返回transform后的数据与训练好的pipeline模型:

def pipelineProcess(inputDF: DataFrame, stages: Array[PipelineStage]): (DataFrame, PipelineModel)= {
    print(">> pipeline fit")
    val model = new Pipeline().setStages(stages).fit(inputDF)
    println(">> pipeline transform")
    val dataSet = model.transform(inputDF)
    val encoderParamMap: ParamMap = model.extractParamMap()

    (dataSet, model)
}

数据集划分

和python中的sklearn模块的train_test_split用法类似,不过没法设置分层划分。

val Array(train, test) = dataSet.randomSplit(Array(0.8, 0.2), seed = 4444)
val Array(trainDF, valDF) = train.randomSplit(Array(0.8, 0.2), seed = 4444)

当然想达到分层划分的效果也是可以的。先把posDF和NegDF独立出来, 按设定好的比例进行按行采样,然后把两个DataFrame进行concatation,最后shuffle rows。

模型

和Python里的API差不多,但python版本有对接sklearn的接口,可以使用的API更加丰富。

val paramMap = Map(
        "eta" -> 0.13,
        "verbosity" -> 2,
        "disable_default_eval_metric " -> 1,
        "max_depth" -> 5,
        "subsample" -> 0.85,
        "colsample_bytree" -> 0.85,
        "objective" -> "binary:logistic",
        "num_round" -> 300,
        "seed" -> 2020
    )

val Classifier = new XGBoostClassifier(paramMap)
        .setLabelCol("label")
        .setFeaturesCol("features")
        .setEvalSets(Map("eval" -> valDF))
        .setEvalMetric("logloss")
        .setNumEarlyStoppingRounds(10)
        .setMaximizeEvaluationMetrics(false)
        .setSeed(2020)  // seed for C++ codes

训练:

val model = Classifier.fit(trainDF)
println(model.summary.toString())  // 输出train set和val set在训练过程中的历史metrics

特征重要性:

val featureScoreMap_gain = model.nativeBooster.getScore("", "gain")
val sortedScoreMap = featureScoreMap_gain.toSeq.sortBy(-_._2)
println(sortedScoreMap)

评估

通过transform方法,得到模型在测试集上的预测结果:

val oriPrediction = model.transform(testDF)

需要注意的是,tranform的输出格式:

/**
  * When we get a model, either XGBoostClassificationModel or XGBoostRegressionModel,

  * it takes a DataFrame, read the column containing feature vectors,

  * predict for each feature vector, and output a new DataFrame

  * XGBoostClassificationModel will output margins (rawPredictionCol),

  * probabilities(probabilityCol) and the eventual prediction labels (predictionCol) for each possible label.

  * */

对于二分类任务,我们需要关系的字段是模型预测的log probability以及真实标签。这是在集群运行时打印出的结果:

prediction

建立evaluator对象,评估AUC:

val evaluator = new BinaryClassificationEvaluator()
evaluator.setLabelCol("label")
evaluator.setRawPredictionCol("probability")
evaluator.setMetricName("areaUnderROC")

/* AUC */
val AUC = evaluator.evaluate(prediction)

有个容易踩坑的地方需注意:

BinaryClassificationEvaluator类默认的setMetricName只能选择areaUnderROC或者areaUnderPR, 并没有提供计算logloss的方法。所以计算logloss需要自己手动实现。

rawPrediction与probability都是Vector(Dense Vector), Vector类定义在org.apache.spark.ml.linalg.Vector包中。 如果需要计算logloss,需要把Vector中我们需要的值拿出来,转为Double或者Float类型,再与label列进行计算。

幸运的是,Vector提供了toArray的方法。因此我们可以通过自定义udf来实现上诉需求:

val vectorToArray = udf((row: Vector) => row.toArray)

然后把udf运用到DataFrame上就可以了。

我们可以把每次训练后得到的auc与logloss落表。在下一次训练后,首先进行评估,然后拉取历史auc与logloss,计算平均值, 再与本次的值进行比较。如果差值超过自定义的阈值,则判断本次训练模型存在问题,可能会影响线上效果。

核心逻辑:

if ((meanAUCRow != null && meanAUCRow.length > 0) && (meanLoglossRow != null && meanLoglossRow.length > 0)) {
    val meanAUCRowHead = meanAUCRow.head
    val meanLoglossRowHead = meanLoglossRow.head
    if ((meanAUCRowHead != null && meanAUCRowHead.length > 0 && meanAUCRowHead.get(0) != null)
            && (meanLoglossRowHead != null && meanLoglossRowHead.length > 0 && meanLoglossRowHead.get(0) != null)) {
        val meanAUC = meanAUCRowHead.getFloat(0)
        val meanLogloss = meanLoglossRowHead.getFloat(0)
        if (((meanAUC - testAUCFloat) <= thresh) || ((meanLogloss - testLoglossFloat) <= thresh)) {1}
        else {0}
    } else {0}
} else{0}

根据评估情况,决定是否进行model persistence:

/* evaluating stage */
println(">> Evaluating start...")
val status = evaluation(test, model, spark, targetTable, evalModelThresh)

// if status is 1, then do model persistence; else, do nothing
if (status == 1) {
    println(">> model persistence...")
    val hdfs: FileSystem =  FileSystem.get(new Configuration())
    val outputStreamPipeline: FSDataOutputStream = hdfs.create(new Path(xgbPath))
    model.nativeBooster.saveModel(outputStreamPipeline)

    println(">> pipeline persisitence...")
    serializePipelineModel(pipelineModel, pipelinePath)

} else {
    println("The auc or logloss of the trained model is under history mean value!")
}

tbc