Spark 读写压缩文件的一次简单尝试

前言

我认为以节省存储空间为角度出发,Spark作业中的读写压缩文件是必不可少的话题,当然这在MR作业中也有体现和实际解决这种问题,现在我们就要在Spark中解决这种问题。

如果需要安装Lzo可以看我的其他文章

源文件是一份access的原始数据

我们在上传到服务器上的时候,使用lzop命令压缩该文件,得到压缩后的文件,上传HDFSF并对该文件创建索引,得到我们代码中需要处理的源文件

需要注意的是未压缩的文件是1G,压缩后为314M

下图是我们想要做的事情

各种压缩格式的性能比较

压缩格式 工具 算法 扩展名 是否支持分割 Hadoop编码/解码器 hadoop自带
DEFLATE N/A DEFLATE .deflate No org.apache.hadoop.io.compress.DefalutCodec
gzip gzip DEFLATE .gz No org.apache.hadoop.io.compress.GzipCodec
bzip2 bzip2 bzip2 .bz2 yes org.apache.hadoop.io.compress.Bzip2Codec
LZO Lzop LZO .lzo yes(建索引) com.hadoop.compression.lzo.LzoCodec
LZ4 N/A LZ4 .lz4 No org.apache.hadoop.io.compress.Lz4Codec
Snappy N/A Snappy .snappy No org.apache.hadoop.io.compress.SnappyCodec

压缩比:Snappy<LZ4<LZO<GZIP<BZIP2

各种压缩格式的性能优缺点

压缩格式 特点
lzo 优点:压缩比在四种压缩方式中较高;hadoop本身支持,在应用中处理gzip格式的文件就和直接处理文本一样;有hadoop native库;大部分linux系统都自带gzip命令,使用方便
缺点:不支持split
snappy 优点:压缩/解压速度也比较快,合理的压缩率;支持split,是hadoop中最流行的压缩格式;支持hadoop native库;需要在linux系统下自行安装lzop命令,使用方便
缺点:压缩率比gzip要低;hadoop本身不支持,需要安装;lzo虽然支持split,但需要对lzo文件建索引,否则hadoop也是会把lzo文件看成一个普通文件(为了支持split需要建索引,需要指定inputformat为lzo格式)
gzip 优点:压缩速度快;支持hadoop native库
缺点:不支持split;压缩比低;hadoop本身不支持,需要安装;linux系统下没有对应的命令
bzip2 优点:支持split;具有很高的压缩率,比gzip压缩率都高;hadoop本身支持,但不支持native;在linux系统下自带bzip2命令,使用方便
缺点:压缩/解压速度慢;不支持native

读LZO文件写HDFS

需要注意的是在Spark中读取HDFS上的压缩文件,需要使用newAPIHadoopFile接口,并且传入LzoTextInputFormat,这个依赖不好解决,仓库是twitter的,下载不了,最好的办法是去github下载源码,install到本地

<dependency>
<groupId>com.hadoop.gplcompression</groupId>
<artifactId>hadoop-lzo</artifactId>
<version>0.4.20</version>
</dependency>

读取文件的具体代码

val linesRDD: RDD[String] = sc.newAPIHadoopFile(in, classOf[LzoTextInputFormat], classOf[LongWritable], classOf[Text])  .map(x => x._2.toString)

现在我们已经读进来了,可以实现我们的业务逻辑,那么最后在Spark Core中如何写出去呢? 只需要使用saveAsTextFile

etl(linesRDD,hashMap)  .filter(x => x.responseSize != 0).saveAsTextFile(out)

读LZO文件写HDFS也为LZO文件

读LZO文件上面的方法同样适用,而写为LZO文件只需要在saveAsTextFile中指定LzopCodec即可,适用于所有的格式压缩

etl(linesRDD,hashMap)  .filter(x => x.responseSize != 0).saveAsTextFile(out,classOf[LzopCodec])

读LZO文件写MYSQL

读写表和读写文件不同,读写表更适合用Spark SQL来实现,现在我们的表的字段非常多,不适用于使用tuple的实现方式,从而自定义外部数据源,在TableScan中也是实现与上面相同代码,就能将lzo文件分片读写

override def buildScan(): RDD[Row] = {
logError("进入buildScan方法")
// 使用RDD拿到Lzo本文内容
val lines = sqlContext.sparkContext.newAPIHadoopFile(path, classOf[LzoTextInputFormat], classOf[LongWritable], classOf[Text])
.map(_._2.toString)
// 拿到响应的schema信息
val fields = schema.fields
// 拿到每行数据,做简单处理,返回RDD[Row]
lines.map(_.split(",").map(_.trim)).map(_.zipWithIndex.map {
case (value, index) =>
Utils.caseTo(value, fields(index).dataType)
}).map(x => Row.fromSeq(x))
}

我在这里尝试了压缩写和普通写,都可以查询到数据

val properties = new Properties()
properties.put("user","root")
properties.put("password","root")

val url = "jdbc:mysql://hadoop:3306/tunan?useUnicode=true&characterEncoding=utf-8"

if (!flat){
// 保存到MySQL
result.write.mode(SaveMode.Overwrite)
.jdbc(url,"domain_group",properties)
}else{
// 保存到MySQL,压缩格式为lzo
result.write.format("orc").option("compression","lzo").mode("overwrite")
.jdbc(url,"domain_group",properties)
}

查询:

读LZO文件写HIVE

读的方式也是一样的,使用外部数据源的方式直接得到DataFrame,在写的时候指定了存储格式,而不指定压缩格式的话,会默认指定压缩格式为Snappy

textDf.write.format("parquet").mode("overwrite").saveAsTable("store_format.parquet_tb")

读LZO文件压缩写HIVE

读Lzo的方式如上,在这里我们尝试了几种格式的压缩写

  1. 指定了存储格式为parquet,压缩格式为lzo

    textDf.write.format("parquet").option("compression","lzo").mode("overwrite").saveAsTable("store_format.parquet_tb")

    可以看得到压缩格式和存储格式发生了变化,我们继续查表

  2. 现在我们将压缩格式改为orc,存储格式还是使用lzo

    textDf.write.format("orc").option("compression","lzo").mode("overwrite").saveAsTable("store_format.parquet_tb")

    我们发现orc和lzo格式并不能一起使用,查出来的是无效的结果

  3. 继续使用压缩格式为orc,存储格式改为snappy

    textDf.write.format("orc").option("compression","snappy").mode("overwrite").saveAsTable("store_format.parquet_tb")

    查询结果:

总结

  1. 使用Lzo压缩的压缩比相比于Snappy较高
  2. 无论哪种压缩格式在MySQL中都是无效的
  3. Parquet可以和Snappy或者Lzo搭配使用,ORC可以和Snappy或者Bzip搭配使用
  4. ORC不能和Lzo搭配使用
Author: Tunan
Link: http://yerias.github.io/2019/10/23/spark/23/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.