在写Spark应用时,常常会碰到序列化的问题。例如,在Driver端的程序中创建了一个对象,而在各个Executor端会用到这个对象——由于Driver端的代码和Executor端的代码在不同的JVM中,甚至在不同的节点上,因此必然要有相应
Java框架进行序列化
在默认情况下,Spark会使用Java的ObjectOutputStream框架对对象进行序列化,并且可以与任何实现java.io.Serializable的类一起工作。您还可以通过扩展java.io.Externalizable来更紧密地控制序列化的性能。Java序列化是灵活的,但通常相当慢,并且会导致许多类的大型序列化格式。
测试代码:
case class Student(id: String, name: String, age: Int, gender: String) |
在Web界面查看:33.2M
Kryo框架进行序列化
Spark还可以使用Kryo库(Spark 2.x)来更快地序列化对象。Kryo比Java(通常多达10倍)要快得多,也更紧凑,但是不支持所有可串行化类型,并且要求您提前注册您将在程序中使用的类,以获得最佳性能。
不注册使用的类测试
在conf中配置spark.serializer = org.apache.spark.serializer.KryoSerializer来使用kryo序列化
case class Student(id: String, name: String, age: Int, gender: String)
object SerializationDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("SerializationDemo")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
val stduentArr = new ArrayBuffer[Student]()
for (i <- 1 to 1000000) {
stduentArr += (Student(i + "", i + "a", 10, "male"))
}
val JavaSerialization = sc.parallelize(stduentArr)
JavaSerialization.persist(StorageLevel.MEMORY_ONLY_SER).count()
while(true) {
Thread.sleep(10000)
}
sc.stop()
}
}在Web界面查看:53.2M
这是因为使用Kryo时,不将使用的类注册,往往会得到比java序列化占用更大的内存
注册使用的类测试
case class Student(id: String, name: String, age: Int, gender: String)
object SerializationDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("SerializationDemo")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[Student])) // 将自定义的类注册到Kryo
val sc = new SparkContext(conf)
val stduentArr = new ArrayBuffer[Student]()
for (i <- 1 to 1000000) {
stduentArr += (Student(i + "", i + "a", 10, "male"))
}
val JavaSerialization = sc.parallelize(stduentArr)
JavaSerialization.persist(StorageLevel.MEMORY_ONLY_SER).count()
while(true) {
Thread.sleep(10000)
}
sc.stop()
}
}在Web界面查看:21.7 M
在conf中注册
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
总结及拓展
Kryo serialization 性能和序列化大小都比默认提供的 Java serialization 要好,但是使用Kryo需要将自定义的类先注册进去,使用起来比 Java serialization麻烦。自从Spark 2.x 以来,我们在使用简单类型、简单类型数组或字符串类型的简单类型来调整RDDs时,在内部使用Kryo序列化器。
通过查找sparkcontext初始化的源码,可以发现某些类型已经在sparkcontext初始化的时候被注册进去。
/** |
也就是说,Boolean、Byte、Char、Double、Float、Int、Long、Null、Short这些类型修饰的属性,自动使用kryo序列化。