Spark中的序列化

在写Spark应用时,常常会碰到序列化的问题。例如,在Driver端的程序中创建了一个对象,而在各个Executor端会用到这个对象——由于Driver端的代码和Executor端的代码在不同的JVM中,甚至在不同的节点上,因此必然要有相应

Java框架进行序列化

在默认情况下,Spark会使用Java的ObjectOutputStream框架对对象进行序列化,并且可以与任何实现java.io.Serializable的类一起工作。您还可以通过扩展java.io.Externalizable来更紧密地控制序列化的性能。Java序列化是灵活的,但通常相当慢,并且会导致许多类的大型序列化格式。

测试代码:

case class Student(id: String, name: String, age: Int, gender: String)

object SerializationDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("SerializationDemo")
val sc = new SparkContext(conf)

val stduentArr = new ArrayBuffer[Student]()
for (i <- 1 to 1000000) {
stduentArr += (Student(i + "", i + "a", 10, "male"))
}
val JavaSerialization = sc.parallelize(stduentArr)
JavaSerialization.persist(StorageLevel.MEMORY_ONLY_SER).count()

while(true) {
Thread.sleep(10000)
}
sc.stop()
}
}

在Web界面查看:33.2M

Kryo框架进行序列化

Spark还可以使用Kryo库(Spark 2.x)来更快地序列化对象。Kryo比Java(通常多达10倍)要快得多,也更紧凑,但是不支持所有可串行化类型,并且要求您提前注册您将在程序中使用的类,以获得最佳性能。

  1. 不注册使用的类测试

    在conf中配置spark.serializer = org.apache.spark.serializer.KryoSerializer来使用kryo序列化

    case class Student(id: String, name: String, age: Int, gender: String)

    object SerializationDemo {
    def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    .setMaster("local[2]")
    .setAppName("SerializationDemo")
    .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)

    val stduentArr = new ArrayBuffer[Student]()
    for (i <- 1 to 1000000) {
    stduentArr += (Student(i + "", i + "a", 10, "male"))
    }
    val JavaSerialization = sc.parallelize(stduentArr)
    JavaSerialization.persist(StorageLevel.MEMORY_ONLY_SER).count()

    while(true) {
    Thread.sleep(10000)
    }
    sc.stop()
    }
    }

    在Web界面查看:53.2M

    这是因为使用Kryo时,不将使用的类注册,往往会得到比java序列化占用更大的内存

  2. 注册使用的类测试

    case class Student(id: String, name: String, age: Int, gender: String)

    object SerializationDemo {
    def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    .setMaster("local[2]")
    .setAppName("SerializationDemo")
    .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    .registerKryoClasses(Array(classOf[Student])) // 将自定义的类注册到Kryo
    val sc = new SparkContext(conf)

    val stduentArr = new ArrayBuffer[Student]()
    for (i <- 1 to 1000000) {
    stduentArr += (Student(i + "", i + "a", 10, "male"))
    }
    val JavaSerialization = sc.parallelize(stduentArr)
    JavaSerialization.persist(StorageLevel.MEMORY_ONLY_SER).count()

    while(true) {
    Thread.sleep(10000)
    }
    sc.stop()
    }
    }

    在Web界面查看:21.7 M

    在conf中注册

    val conf = new SparkConf().setMaster(...).setAppName(...)
    conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
    val sc = new SparkContext(conf)

总结及拓展

Kryo serialization 性能和序列化大小都比默认提供的 Java serialization 要好,但是使用Kryo需要将自定义的类先注册进去,使用起来比 Java serialization麻烦。自从Spark 2.x 以来,我们在使用简单类型、简单类型数组或字符串类型的简单类型来调整RDDs时,在内部使用Kryo序列化器。

通过查找sparkcontext初始化的源码,可以发现某些类型已经在sparkcontext初始化的时候被注册进去。

 /**
* Component which configures serialization, compression and encryption for various Spark
* components, including automatic selection of which [[Serializer]] to use for shuffles.
*/
private[spark] class SerializerManager(
defaultSerializer: Serializer,
conf: SparkConf,
encryptionKey: Option[Array[Byte]]) {

def this(defaultSerializer: Serializer, conf: SparkConf) = this(defaultSerializer, conf, None)

private[this] val kryoSerializer = new KryoSerializer(conf)

private[this] val stringClassTag: ClassTag[String] = implicitly[ClassTag[String]]
private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = {
val primitiveClassTags = Set[ClassTag[_]](
ClassTag.Boolean,
ClassTag.Byte,
ClassTag.Char,
ClassTag.Double,
ClassTag.Float,
ClassTag.Int,
ClassTag.Long,
ClassTag.Null,
ClassTag.Short
)
val arrayClassTags = primitiveClassTags.map(_.wrap)
primitiveClassTags ++ arrayClassTags

也就是说,Boolean、Byte、Char、Double、Float、Int、Long、Null、Short这些类型修饰的属性,自动使用kryo序列化。

Author: Tunan
Link: http://yerias.github.io/2019/10/12/spark/12/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.