目录
- Spark自带监控
- Spark接口监控
- Spark自定义监控
Spark自带监控
第一种监控方式是Spark自带的,由于Spark Web UI界面只在sc的生命周期内有效,所以我们需要存储日志,在Spark sc 生命周期结束后重构UI界面。
首先看官方文档配置,这里只是简单配置
修改spark.default.xml
#开启日志存储
spark.eventLog.enabled true
#指定日志存储的HDFS目录
spark.eventLog.dir hdfs://hadoop:9000/spark-logs
#开启日志存储7天自动删除
spark.history.fs.cleaner.enabled true修改spark.env.xml
#指定日志恢复目录,就是上面的日志存储目录
SPARK_HISTORY_OPTS = "-Dspark.history.fs.logDirectory=hdfs://hadoop:9000/spark-logs"在 sc 的生命周期外打开历史UI界面
Spark接口监控
首先看官方文档配置,这里只是简单介绍
查看application列表:
http://hadoop:18080/api/v1/applications |
查看application的所有job
http://hadoop:18080/history/application_1585632916452_0002/jobs/ |
Spark自定义监控
metrics: 数据信息
spark 提供了一系列整个任务生命周期中各个阶段变化的事件监听机制,通过这一机制可以在任务的各个阶段做一些自定义的各种动作。SparkListener便是这些阶段的事件监听接口类 通过实现这个类中的各种方法便可实现自定义的事件处理动作。
自定义监听sparListener后的注册方式有两种:
方法1:conf 配置中指定
val conf = new SparkConf() |
方法2:sparkContext 类中指定
sc.addSparkListener(new MySparkAppListener) |
SparkListerner
//SparkListener 下各个事件对应的函数名非常直白,即如字面所表达意思。 |
Demo
首先看官方文档配置,这里只是简单案例
- 自定义监控类,继承SparkListener
- 重写onTaskEnd方法,拿到taskMetrics
- 从taskMetrics获取各种数据信息
- 注册到被监听的类
第1-3步代码
class MySparkListener(conf:SparkConf) extends SparkListener with Logging{ |
第四步代码
object WordCount { |
2020-11-10更新
taskEnd.taskInfo.status
该参数决定作业的成功或者失败
def status: String = { |
小demo:
val status: String = taskEnd.taskInfo.status |
注意如果是大范围的实现某个abstract或者trait,最好加一层中间的代码过度达到解耦,我们实现的是中间的过度接口,与原来的接口隔绝来,这样如果升级版本的时候源码有所改动我们只需要改中间层的继承接口。
Spark自定义监控案例
把监控参数写入到MySQL
需求:应用程序名字、jobID号、stageID号、taskID号、读取数据量、写入数据量、shuffle读取数据量、shuffle写入数据量。
建表
create table wc2mysql(
app_name varchar(32),
job_id bigint,
stage_id bigInt,
task_id bigint,
file_read_byte bigint,
file_write_byte bigint,
shuffle_read_byte bigint,
shuffle_write_byte bigint
)实现监控类
class MySparkListenerV2(conf: SparkConf) extends SparkListener with Logging {
//定义JobID
var jobId:Long = _
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
//初始化
jobId = jobStart.jobId
}
println("============准备插入数据============")
//监听每个Task结束
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
//获取应用程序名称
val appName = conf.get("spark.app.name")
//获取作业的taskMetrics
val metrics = taskEnd.taskMetrics
//使用对象接收参数
val listener = Listener(appName, jobId, taskEnd.stageId, taskEnd.taskInfo.taskId, metrics.inputMetrics.bytesRead, metrics.outputMetrics.bytesWritten, metrics.shuffleReadMetrics.totalBytesRead, metrics.shuffleWriteMetrics.bytesWritten)
//结果插入到MySQL
ListenerCURD.insert(listener)
//发送监控邮件
if ("true" == conf.get("spark.send.mail.enabled")){
MsgUtils.send("971118017@qq.com", "ERROR:数据异常", s"jobID: $jobId 数据异常,请马上检查: ${listener.toString}")
}
}
println("============成功插入数据============")
}实现监控数据写入MySQL
使用的是scalikejdbc框架实现的,使用具体方法在我的博客
case class Listener(app_name: String, job_id: Long, stage_id: Long, task_id: Long, file_read_byte: Long, file_write_byte: Long, shuffle_read_byte: Long, shuffle_write_byte: Long)
object ListenerCURD {
def Before(): Unit = {
//初始化配置
DBs.setupAll()
}
def insert(listener: Listener): Unit = {
Before()
//事物插入
DB.localTx {
implicit session => {
SQL("insert into wc2mysql(app_name,job_id,stage_id,task_id,file_read_byte,file_write_byte,shuffle_read_byte,shuffle_write_byte) values(?,?,?,?,?,?,?,?)")
.bind(listener.app_name,listener.job_id, listener.stage_id, listener.task_id, listener.file_read_byte, listener.file_write_byte, listener.shuffle_read_byte, listener.shuffle_write_byte)
.update()
.apply()
}
}
After()
}
def After(): Unit = {
//关闭资源
DBs.closeAll()
}
}启动被监控类
被监控的类还是我们上面的WordCount的类,关键在于在SparkConf()中注册
//配置conf
val conf = new SparkConf()
.setAppName(getClass.getSimpleName)
.setMaster("local[2]")
//监听类注册
.set("spark.extraListeners", "com.tunan.spark.listener.MySparkListenerV2")在数据库中查询
select * from wc2mysql;