Hive运行SQL重置进度,如何控制map个数与参数调优

背景

生产上执行一条简单的sql却一直失败,猜测是Container的Memory或者Core超出了阈值,自动被杀了。

insert into table t select * from t2

目前已知的是文件大小200M,压缩格式是ORC,从日志中发现了一个可疑的地方,mapper的个数是1,怀疑是mapper的个数太少了,遂增加mapper的个数后,执行成功。

思考

Hive有哪些参数,如何查看这些参数

  1. Hive自带的配置属性列表封装在HiveConf类中,因此请参阅该HiveConf.java文件以获取Hive版本中可用的配置属性的完整列表。具体可以下载hive.src通过eclipse查看。全部属性有上千个吧,一般Hive的自带属性都是以hive.开头的,每个属性且自带详细的描述信息,其次Hive官网也有,但是属性不是特别全。Hive官方参数网址
  2. Hive除了自身带了一些配置属性,因为其底层使用的是hadoop(HDFS,MR,YARN),所以有些HADOOP的配置属性Hive也可以使用,进行配置,但是有些则使用不了。比如mapred.max.split.size 就属于MR的参数,但是hive可以使用。

map个数的控制参数与性能调优

很显然,对于这个控制每个map的split输入大小的参数,不是hive自带的参数,而是MR提供的参数,但是Hive可以通过set的形式配置使用,而且对于调优有很大的作用。但是这个参数实际上要配合HDFS的blocksize一起使用。

-- 每个Map最大输入大小,
hive> set mapred.max.split.size;
mapred.max.split.size=256000000
-- 每个Map最小输入大小
hive> set mapred.min.split.size;
mapred.min.split.size=10000000
hive> set dfs.block.size;
dfs.block.size=134217728 我们集群默认hdfs的block块大小是128Mb,但注意这个参数通过hive设置更改实际没有用的,只能hdfs设置。

数据准备,两张表

如下进行两张表join,其中每张表的大小,hdfs上存储的文件总个数,以及每个文件的大小。

  1. 大表总共158749566行,文件总大小4.4G,存储个数22个文件,每个大小200Mb左右。
  2. 小表总共1979375 行,文件大小50.7Mb,存储个数2个文件,大小50Mb以内。
[finance@hadoop-client13-prd ~]$ hadoop fs -du  -h  hdfs://suninghadoop2/user/finance/hive/warehouse/fdm_tmp.db/company_liquidation_fgeics_company_ar_d
206.7 M hdfs://suninghadoop2/user/finance/hive/warehouse/fdm_tmp.db/company_liquidation_fgeics_company_ar_d/000000_0.deflate
.....省略.................
hdfs://suninghadoop2/user/finance/hive/warehouse/fdm_tmp.db/company_liquidation_fgeics_company_ar_d/000021_0.deflate
---------------------------------------------------------------------------------------
[finance@hadoop-client13-prd ~]$ hadoop fs -du -h hdfs://suninghadoop2/user/finance/hive/warehouse/fdm_tmp.db/t_fgeics_company_liquidation_d_tmp
36.4 M hdfs://suninghadoop2/user/finance/hive/warehouse/fdm_tmp.db/t_fgeics_company_liquidation_d_tmp/000000_0.deflate
14.3 M hdfs://suninghadoop2/user/finance/hive/warehouse/fdm_tmp.db/t_fgeics_company_liquidation_d_tmp/000001_0.deflate

两个表进行关联,其中小表在前,大表在后

如下,运行如下代码,实现两个表进行关联。

set mapred.max.split.size=134217728;
set mapred.min.split.size=134217728;
drop table IF EXISTS fdm_tmp.company_liquidation_fgeics_company_ar_d_tmp;
create table fdm_tmp.company_liquidation_fgeics_company_ar_d_tmp
as
select
a.id
,a.entid
,a.ancheyear
,b.liqmen
,b.ligprincipal
,a.regno
,a.tel
,a.postalcode
,a.dom
,a.email
,a.busst
,a.empnum
,a.name
,a.updated
,b.etl_time
from fdm_tmp.t_fgeics_company_liquidation_d_tmp b
right join fdm_tmp.company_liquidation_fgeics_company_ar_d a
on b.entid = a.entid;

Hadoop job information for Stage-1: number of mappers: 24; number of reducers: 17

结果分析:

hive启动了24个map函数,17个reduce函数。在hadoop中,一般默认的split切片小于等于blocksize(128Mb),如果是小文件的话(未进行小文件的合并)则每个小文件启动一个map函数。而实际在hive中,并不是split的大小要小于等于blocksize,而是可以远大于blocksize。比如这里,4.4G文件表,按128Mb切片算的话,至少实际需要35个map,而实际只需要24个,平均每个map处理了187Mb的文件。这是为什么呢?

此外这里明明设置了set mapred.max.split.size=134217728,每个map最大split块是 128Mb,而实际为什么参数没有用呢?网上有很多关于这方面的文章,但是几乎都是转载抄袭,没有任何深入理解,或者深入剖析决定map个数的原因。

案例演示决定map个数的因素

其实决定map个数的因素有很多,比如文件是否压缩,压缩的后的文件是否支持切分,比如文件默认的inputfort格式,不同实现类的split算法也不同,那么map的个数调优方式也不同,下面按分类详细说明hive中决定map个数的因素与常见map调优的使用。

首先分类: 处理的文件是否压缩,且压缩算法是否支持文件的切分

  1. 文件使用了压缩,且压缩算法不支持文件切分

    hive>  set io.compression.codecs; --配置了哪些压缩算法
    io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.hadoop.io.compress.BZip2Codec
    hive> set hive.exec.compress.output;
    hive.exec.compress.output=true --是否开启压缩
    hive> set mapreduce.output.fileoutputformat.compress.codec; --使用的压缩算法
    mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec

    我们知道hive中有些压缩算法是不支持文件切分的,如下我们使用的默认的deflate算法,就不支持文件切分。

    ------------------------使用不同参数执行上面代码产生的map个数-----------------------------
    --1.使用系统配置的默认值
    set mapred.max.split.size = 256000000; --
    set mapred.min.split.size = 256000000;
    Hadoop job information for Stage-1: number of mappers: 24; number of reducers: 17

    --2.降低系统默认值
    set mapred.max.split.size=134217728;
    set mapred.min.split.size=134217728;
    Hadoop job information for Stage-1: number of mappers: 24; number of reducers: 17

    --3.调高系统默认值
    set mapred.max.split.size=500000000;
    set mapred.min.split.size=256000000;
    Hadoop job information for Stage-1: number of mappers: 9; number of reducers: 17

    --4.调高系统默认值
    set mapred.max.split.size=1024000000;
    set mapred.min.split.size=1024000000;
    Hadoop job information for Stage-1: number of mappers:6 ; number of reducers: 17

    如上我们使用不同的参数配置,来运行上面同一段代码,看系统产生的map个数,细心的人会发现,当我们使用默认值是产生了24个map,平均每个map处理了187Mb文件,但当我们调低set mapred.max.split.size=134217728时(每个map最多处理128Mb),相应的map个数并没有增加,这是为什么呢?

    关于这个问题就要说到决定map个数的首要因素:文件是否启动压缩,且压缩算法是否支持文件切分了。因为这里文件存储使用默认的deflate算法,不支持文件切分,所以设置的参数split.size=134217728没有生效。因为每个map处理的splitsize实际上要大于等于每个文件存储的大小。这里每个文件存储的大小200Mb左右,所以每个map处理的最小尺寸要大于200Mb。

    而当我们将set mapred.max.split.size=102400000设置的很大时,为什么又可以控制map个数了呢?因为deflate压缩算法虽然不支持文件切分,但是可以进行文件合并哇。从hive0.5开始就默认map前进行小文件合并了。如下,我们使用的也是默认的开启map前文件合并。但是注意即使这里支持文件合并,也是基于文件块的整个文件块合并,而不是基于blocksize的block合并。

    hive> set hive.input.format; --hive0.5开始的默认值,这个值会影响map个数的控制
    hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

    提示:

    通过上面分析总结,当hive需要处理的文件是压缩,且压缩算法不支持文件切分的时候,决定map个数的因素主要是文件块实际存储的大小,如果文件块本身很大,比如500Mb左右,那么每个map处理的splitsize至少要是500Mb左右。这个时候我们不能人为通过参数降低每个map的splitsize来增加map个数,只能通过增加splitsize,减少map个数。

    但是一般经验来说,每个map处理的splitsize最好是128Mb(等于blocksize),这样效率最高。所以这个时候如果我们想增加map个数,只能通过临时表或者insert …select的形式,通过参数配置将文件块重新存储成较小的文件块,然后再进行处理。反之,如果文件块本身很小,那么我们可以通过增加splitsize来减少map,进而调优提高程序的运行效率。

    总结:

    如果hive处理的文件是压缩模式,且压缩模式不支持文件切分,那么这个时候我们只能通过控制参数来减少map个数,而不能通过配置参数来增加map个数,所以Hive对于压缩不可切分文件的调优有限。可以首先通过hadoop fs -du -s -h命令查看文件的存储大小结果,然后根据实际进行调优。常用的配置参数如下:

    set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; --hive0.5开始就是默认值,执行map前进行小文件合并
    ----------------------------------------------------------------------
    set mapred.max.split.size=256000000
    set mapred.min.split.size=10000000
    set mapred.min.split.size.per.node=8000000 --每个节点处理的最小split
    set mapred.min.split.size.per.rack=8000000 --每个机架处理的最小slit.
    ------------------------------------------------------------------------
    1.注意一般来说这四个参数的配置结果大小要满足如下关系。
    max.split.size >= min.split.size >= min.size.per.node >= min.size.per.node

    2.这四个参数的作用优先级分别如下
    max.split.size <= min.split.size <= min.size.per.node <= min.size.per.node

    比如如下,同样上面的代码,我们将其参数设置如下,发现只启动了12map,故max.split.size没有起作用。
    当四个参数设置矛盾时,系统会自动以优先级最高的参数为准,进行计算
    set mapred.max.split.size=300000000;
    set mapred.min.split.size.per.node=300000000;
    set mapred.min.split.size.per.rack=300000000;
    Hadoop job information for Stage-1: number of mappers: 12; number of reducers: 17

    3.注意这四个参数可以选择性设置,可以选择性设置大小或者使用默认值,但仍遵循前两条规则。
  2. 文件未使用压缩,或压缩算法支持文件切分

    同样是上面那个4.4g文件,我们这个时候让其为非压缩模式,发现这个时候文件总大小为23.4G,存储为22个文件,平均每个文件大小都在1.1G左右。所以压缩有时候是个好东西。如下我们所有非压缩的性能测试基于此文件。

    [finance]$ hadoop fs -count hdfs://suninghadoop2/user/finance/hive/warehouse/fdm_tmp.db/company_liquidation_fgeics_company_ar_d
    1 22 25154158871 hdfs://suninghadoop2/user/finance/hive/warehouse/fdm_tmp.db/company_liquidation_fgeics_company_ar_d
    -----------------------------------------------------------------------------------------------------------------------------------
    [finance]$ hadoop fs -du -s -h hdfs://suninghadoop2/user/finance/hive/warehouse/fdm_tmp.db/company_liquidation_fgeics_company_ar_d
    23.4 G hdfs://suninghadoop2/user/finance/hive/warehouse/fdm_tmp.db/company_liquidation_fgeics_company_ar_d
    -----------------------------------------------------------------------------------------------------------------------------------
    [finance]$ hadoop fs -du -h hdfs://suninghadoop2/user/finance/hive/warehouse/fdm_tmp.db/company_liquidation_fgeics_company_ar_d
    1.1 G hdfs://suninghadoop2/user/finance/hive/warehouse/fdm_tmp.db/company_liquidation_fgeics_company_ar_d/000000_0
    1.1 G hdfs://suninghadoop2/user/finance/hive/warehouse/fdm_tmp.db/company_liquidation_fgeics_company_ar_d/000001_0
    ...............................

    若这时set hive.input.format为HiveInputFormat

    hive> set hive.input.format; --从hive0.5就默认是CombineHiveInputFormat,所以这个用的不多
    hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

    如果这时set hive.input.format为HiveInputFormat,这个时候有以下三个属性值来确定InputSplit的个数:

    set mapred.map.tasks=2
    set mapred.max.split.size=256000000
    set mapred.min.split.size=10000000
    set dfs.block.size=134217728 --128Mb
    --------------------------------------------------------------------------------------
    1. goalSize:该值由 totalSize/numSplits totlasize文件块大小,numslplits=mapred.map.tasks=2
    2. minSize:由配置参数 mapred.min.split.size(或者新版本的mapreduce.input.fileinputformat.split.minsize)mapred.min.split.size=10000000
    决定的 InputFormat的最小长度,默认为1
    3.blockSize:HDFS 中的文件存储块block的大小,默认为128MB。

    这三个参数决定一个 InputFormat 分片的最终的长度,计算方法如下:
    splitSize = max{minSize,min{goalSize,blockSize}}

    总结:

    当hive处理的文件是非压缩或者压缩可切分,且hive.input.format为HiveInputFormat时,这个时候决定map个数的参数主要是splitSize = max{minSize,min{goalSize,blockSize}} ,只有这个时候一般map的splitsize小于等于blocksize(128Mb)。但其实这种方式现在企业实际开发中已经使用的很少了。

    若hive.input.format为默认CombineHiveInputFormat

    如下,使用默认的map函数split参数,发现未压缩的23.4g的文件(22个)这里共使用了112个map函数,符合参数的设置结果set mapred.max.split.size=256000000 ; 23.4*1024/112=187Mb<256000000。

    --使用默认配置参数,实现对非压缩文件的操作
    set mapred.max.split.size=256000000 ;
    set mapred.min.split.size=10000000;

    drop table IF EXISTS fdm_tmp.company_liquidation_fgeics_company_ar_d_tmp;
    create table fdm_tmp.company_liquidation_fgeics_company_ar_d_tmp
    as
    select
    a.id
    ,a.entid
    ,a.ancheyear
    ,b.liqmen
    ,b.ligprincipal
    ,a.regno
    ,a.tel
    ,a.postalcode
    ,a.dom
    ,a.email
    ,a.busst
    ,a.empnum
    ,a.name
    ,a.updated
    ,b.etl_time
    from fdm_tmp.t_fgeics_company_liquidation_d_tmp b
    right join fdm_tmp.company_liquidation_fgeics_company_ar_d a
    on b.entid = a.entid;

    Hadoop job information for Stage-1: number of mappers: 112; number of reducers: 85
    Time taken: 152.679 seconds

    如下,将上面默认的参数对应调小,运行同样上面的代码,看map数是否有增加? 很显然,我们通过设置max.split.size的值实现了增加map个数的功能。这里map的个数由122个数变成了199个,平均每个map处理数据120Mb左右。但是运行时间却变慢了很多,时间由153s变成了243s。所以map的split大小并不是要接近blocksize才高效,这主要跟集群的性能配置有关。

    --将map的split参数最大值设置为128Mb.
    set mapred.max.split.size=134217728 ;
    set mapred.min.split.size=10000000;

    Hadoop job information for Stage-1: number of mappers: 199; number of reducers: 85
    Time taken: 243.038 seconds

    如下,将上面的默认参数增加,运行同样的代码,结果虽然我们将maxsplitsize设置的特别大,但是对应map的个数并没有对应的成倍减少,如果按最大值算应该在20多个map,而实际不是这样。这说明,光设置最大值是没有用的,这只是一个峰值,还有对应的设置最小值。

    --只将max.split.size设置的特别大,min.split.size还是10Mb左右。
    set mapred.max.split.size=1024000000;
    set mapred.min.split.size= 10000000;
    Hadoop job information for Stage-1: number of mappers: 106; number of reducers: 85

    map的多个参数配合使用,精确控制map的个数

     
    --1.只将max.split.size设置的特别大,且将 min.split.size使用默认值,发现map个数没有成倍减少。
    set mapred.max.split.size=1024000000;
    set mapred.min.split.size=10000000;
    Hadoop job information for Stage-1: number of mappers: 106; number of reducers: 85
    ------------------------------------------------------------------------------------
    --2.同时将max.split.size设置的特别大,且将 min.split.size同时设置很大为256Mb左右
    但是发现map的个数并没有减少,还是和上面一样,这说明控制map的个数还有别的因素
    set mapred.max.split.size=1024000000;
    set mapred.min.split.size=256000000;
    Hadoop job information for Stage-1: number of mappers: 106; number of reducers: 85
    ------------------------------------------------------------------------------------
    ---3.配合min.split.size.per.node使用,发现map个数仍然没有减少

    set mapred.max.split.size=1024000000;
    set mapred.min.split.size= 256000000;
    set mapred.min.split.size.per.node=256000000;--默认值是800000
    set mapred.min.split.size.per.rack=800000;---默认值
    --------------------------------------------------------------------------------------
    ---4.配合min.split.size.per.rack使用,map个数精准减少了,每个map处理的数据在256和1024之间
    Hadoop job information for Stage-1: number of mappers: 106; number of reducers: 85
    set mapred.max.split.size=1024000000;
    set mapred.min.split.size= 256000000;
    set mapred.min.split.size.per.node=256000000;
    set mapred.min.split.size.per.rack=256000000;

    Hadoop job information for Stage-1: number of mappers: 88; number of reducers: 85

    总结:

    如果Hive处理的的文件为非压缩格式或者压缩可切分,且inputFormat为CombineHiveInputFormat时,则控制map个数是由以下四个参数起作用,关于这四个参数作用优先级与使用注意事项请参考如下。

    mapred.min.split.size 或者 mapreduce.input.fileinputformat.split.minsize。
    mapred.max.split.size 或者 mapreduce.input.fileinputformat.split.maxsize。
    mapred.min.split.size.per.rack 或者 mapreduce.input.fileinputformat.split.minsize.per.rack。
    mapred.min.split.size.per.node 或者 mapreduce.input.fileinputformat.split.minsize.per.node。

    set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; --hive0.5开始就是默认值,执行map前进行小文件合并
    ----------------------------------------------------------------------
    set mapred.max.split.size=256000000 --集群默认值
    set mapred.min.split.size=10000000 --集群默认值
    set mapred.min.split.size.per.node=8000000 --每个节点处理的最小split
    set mapred.min.split.size.per.rack=8000000 --每个机架处理的最小slit.
    ------------------------------------------------------------------------
    1.注意一般来说这四个参数的配置结果大小要满足如下关系。
    max.split.size >= min.split.size >= min.size.per.node >= min.size.per.node

    2.这四个参数的作用优先级分别如下
    max.split.size <= min.split.size <= min.size.per.node <= min.size.per.node

    比如如下,同样上面的代码,我们将其参数设置如下,发现只启动了12map,故max.split.size没有起作用。
    当四个参数设置矛盾时,系统会自动以优先级最高的参数为准,进行计算
    set mapred.max.split.size=300000000;
    set mapred.min.split.size.per.node=300000000;
    set mapred.min.split.size.per.rack=300000000;
    Hadoop job information for Stage-1: number of mappers: 12; number of reducers: 17

    3.注意这四个参数可以选择性设置,可以选择性设置大小或者使用默认值,但仍遵循前两条规则。

    所以如果对于Hive调优,想通过控制map个数进行调优,首先确定集群是否启动了压缩,且压缩的算法是否直接文件切分,然后再确定集群配置的默认的hive.input.format是什么实现类,不同实现类对于split的算法不同,当然控制map的参数也不同。所以对于控制map个数调优远远不是网上很多人说的那么简单。

Author: Tunan
Link: http://yerias.github.io/2021/06/30/hive/32/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.