Spark之监控模块

目录

  1. Spark自带监控
  2. Spark接口监控
  3. Spark自定义监控

Spark自带监控

第一种监控方式是Spark自带的,由于Spark Web UI界面只在sc的生命周期内有效,所以我们需要存储日志,在Spark sc 生命周期结束后重构UI界面。

首先看官方文档配置,这里只是简单配置

  1. 修改spark.default.xml

    #开启日志存储
    spark.eventLog.enabled true
    #指定日志存储的HDFS目录
    spark.eventLog.dir hdfs://hadoop:9000/spark-logs
    #开启日志存储7天自动删除
    spark.history.fs.cleaner.enabled true
  2. 修改spark.env.xml

    #指定日志恢复目录,就是上面的日志存储目录
    SPARK_HISTORY_OPTS = "-Dspark.history.fs.logDirectory=hdfs://hadoop:9000/spark-logs"
  3. 在 sc 的生命周期外打开历史UI界面

Spark接口监控

首先看官方文档配置,这里只是简单介绍

查看application列表:

http://hadoop:18080/api/v1/applications

查看application的所有job

http://hadoop:18080/history/application_1585632916452_0002/jobs/

Spark自定义监控

metrics: 数据信息

spark 提供了一系列整个任务生命周期中各个阶段变化的事件监听机制,通过这一机制可以在任务的各个阶段做一些自定义的各种动作。SparkListener便是这些阶段的事件监听接口类 通过实现这个类中的各种方法便可实现自定义的事件处理动作。

自定义监听sparListener后的注册方式有两种:

方法1:conf 配置中指定

val conf = new SparkConf()
.setAppName(getClass.getSimpleName)
.setMaster("local[2]")
//监听类注册
.set("spark.extraListeners", "com.tunan.spark.listener.MySparkListener")
.getOrCreate()

方法2:sparkContext 类中指定

sc.addSparkListener(new MySparkAppListener)

SparkListerner

//SparkListener 下各个事件对应的函数名非常直白,即如字面所表达意思。
//想对哪个阶段的事件做一些自定义的动作,变继承SparkListener实现对应的函数即可

abstract class SparkListener extends SparkListenerInterface {
//阶段完成时触发的事件
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { }

//阶段提交时触发的事件
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { }

//任务启动时触发的事件
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }

//下载任务结果的事件
override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { }

//任务结束的事件
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { }

//job启动的事件
override def onJobStart(jobStart: SparkListenerJobStart): Unit = { }

//job结束的事件
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { }

//环境变量被更新的事件
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { }

//块管理被添加的事件
override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { }

override def onBlockManagerRemoved(
blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { }

//取消rdd缓存的事件
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { }

//app启动的事件
override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { }

//app结束的事件 [以下各事件也如同函数名所表达各个阶段被触发的事件不在一一标注]
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { }

override def onExecutorMetricsUpdate(
executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }

override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }

override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }

override def onExecutorBlacklisted(
executorBlacklisted: SparkListenerExecutorBlacklisted): Unit = { }

override def onExecutorUnblacklisted(
executorUnblacklisted: SparkListenerExecutorUnblacklisted): Unit = { }

override def onNodeBlacklisted(
nodeBlacklisted: SparkListenerNodeBlacklisted): Unit = { }

override def onNodeUnblacklisted(
nodeUnblacklisted: SparkListenerNodeUnblacklisted): Unit = { }

override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }

override def onOtherEvent(event: SparkListenerEvent): Unit = { }
}

Demo

首先看官方文档配置,这里只是简单案例

  1. 自定义监控类,继承SparkListener
  2. 重写onTaskEnd方法,拿到taskMetrics
  3. 从taskMetrics获取各种数据信息
  4. 注册到被监听的类

第1-3步代码

class MySparkListener(conf:SparkConf) extends SparkListener with Logging{
//监听每个Task结束
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
//获取应用程序名称
val appName = conf.get("spark.app.name")
//获取作业的taskMetrics
val metrics = taskEnd.taskMetrics
//使用对象接收参数
val metricsObject = Metrics(appName,taskEnd.stageId, taskEnd.taskInfo.taskId, metrics.inputMetrics.bytesRead, metrics.shuffleReadMetrics.totalBytesRead, metrics.outputMetrics.bytesWritten, metrics.shuffleWriteMetrics.bytesWritten)
//输出字符串类型的metricsObject
logError(metricsObject.toString)
//输出Json类型的metricsObject
logError(Json(DefaultFormats).write(metricsObject))
}
//定义case class对象
case class Metrics(appName:String,stageId:Long,taskId:Long,bytesRead:Long,bytesWritten:Long,shuffleReadMetrics:Long,shuffleWriteMetrics:Long){
override def toString: String = s"appName:$appName,stageId:$stageId,taskId:$taskId,bytesRead:$bytesRead,bytesWritten:$bytesWritten,shuffleReadMetrics:$shuffleReadMetrics,shuffleWriteMetrics:$shuffleWriteMetrics"
}
}

第四步代码

object WordCount {
def main(args: Array[String]): Unit = {
//输入、输出路径
val (in,out) = (args(0),args(1))
//配置conf
val conf = new SparkConf()
.setAppName(getClass.getSimpleName)
.setMaster("local[2]")
//监听类注册
.set("spark.extraListeners", "com.tunan.spark.listener.MySparkListener")
//拿到sc
val sc = new SparkContext(conf)
//删除输出目录
CheckHDFSOutPath.ifExistsDeletePath(new Configuration(),out)
//操作算子
val result = sc.textFile(in).flatMap(_.split("\t")).map((_, 1)).reduceByKey(_ + _)
//保存文件
result.saveAsTextFile(out)
//关闭sc
sc.stop()
}
}

2020-11-10更新

taskEnd.taskInfo.status 该参数决定作业的成功或者失败

def status: String = {
if (running) {
if (gettingResult) {
"GET RESULT"
} else {
"RUNNING"
}
} else if (failed) {
"FAILED"
} else if (killed) {
"KILLED"
} else if (successful) {
"SUCCESS"
} else {
"UNKNOWN"
}
}

小demo:

val status: String = taskEnd.taskInfo.status
if("SUCCESS".equals(status)){
println("作业完成")
}

注意如果是大范围的实现某个abstract或者trait,最好加一层中间的代码过度达到解耦,我们实现的是中间的过度接口,与原来的接口隔绝来,这样如果升级版本的时候源码有所改动我们只需要改中间层的继承接口。

Spark自定义监控案例

把监控参数写入到MySQL

需求:应用程序名字、jobID号、stageID号、taskID号、读取数据量、写入数据量、shuffle读取数据量、shuffle写入数据量。

  1. 建表

    create table wc2mysql(
    app_name varchar(32),
    job_id bigint,
    stage_id bigInt,
    task_id bigint,
    file_read_byte bigint,
    file_write_byte bigint,
    shuffle_read_byte bigint,
    shuffle_write_byte bigint
    )
  2. 实现监控类

    class MySparkListenerV2(conf: SparkConf) extends SparkListener with Logging {

    //定义JobID
    var jobId:Long = _

    override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    //初始化
    jobId = jobStart.jobId
    }

    println("============准备插入数据============")
    //监听每个Task结束
    override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    //获取应用程序名称
    val appName = conf.get("spark.app.name")

    //获取作业的taskMetrics
    val metrics = taskEnd.taskMetrics

    //使用对象接收参数
    val listener = Listener(appName, jobId, taskEnd.stageId, taskEnd.taskInfo.taskId, metrics.inputMetrics.bytesRead, metrics.outputMetrics.bytesWritten, metrics.shuffleReadMetrics.totalBytesRead, metrics.shuffleWriteMetrics.bytesWritten)

    //结果插入到MySQL
    ListenerCURD.insert(listener)

    //发送监控邮件
    if ("true" == conf.get("spark.send.mail.enabled")){
    MsgUtils.send("971118017@qq.com", "ERROR:数据异常", s"jobID: $jobId 数据异常,请马上检查: ${listener.toString}")
    }
    }
    println("============成功插入数据============")
    }
  3. 实现监控数据写入MySQL

    使用的是scalikejdbc框架实现的,使用具体方法在我的博客

    case class Listener(app_name: String, job_id: Long, stage_id: Long, task_id: Long, file_read_byte: Long, file_write_byte: Long, shuffle_read_byte: Long, shuffle_write_byte: Long)

    object ListenerCURD {

    def Before(): Unit = {
    //初始化配置
    DBs.setupAll()
    }

    def insert(listener: Listener): Unit = {
    Before()
    //事物插入
    DB.localTx {
    implicit session => {
    SQL("insert into wc2mysql(app_name,job_id,stage_id,task_id,file_read_byte,file_write_byte,shuffle_read_byte,shuffle_write_byte) values(?,?,?,?,?,?,?,?)")
    .bind(listener.app_name,listener.job_id, listener.stage_id, listener.task_id, listener.file_read_byte, listener.file_write_byte, listener.shuffle_read_byte, listener.shuffle_write_byte)
    .update()
    .apply()
    }
    }
    After()
    }

    def After(): Unit = {
    //关闭资源
    DBs.closeAll()
    }
    }
  4. 启动被监控类

    被监控的类还是我们上面的WordCount的类,关键在于在SparkConf()中注册

    //配置conf
    val conf = new SparkConf()
    .setAppName(getClass.getSimpleName)
    .setMaster("local[2]")
    //监听类注册
    .set("spark.extraListeners", "com.tunan.spark.listener.MySparkListenerV2")
  5. 在数据库中查询

    select * from wc2mysql;
Author: Tunan
Link: http://yerias.github.io/2019/10/07/spark/7/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.