如何控制reduce个数与参数调优

相比map个数的控制复杂性,reduce个数的设定要相对简单多了,reduce的个数一般最后决定了输出文件的个数,二者相等,如果想多输出文件的个数(这样文件变小,但有可能程序变慢),那么可以人为增加reduce个数。如果想减少文件个数,也可以手动较少reduce个数(同样可能程序变慢)。但实际开发中,reduce的个数一般通过程序自动推定,而不人为干涉,因为人为控制的话,如果使用不当很容易造成结果不准确,且降低执行效率。

控制reduce个数的方式与参数

首先可以通过参数直接控制最终reduce的个数,使用参数mapred.reduce.tasks

hive> set mapred.reduce.tasks ;
mapred.reduce.tasks=-1 --我们使用的官方默认值-1,表示不人为设置reduce的个数,实际这种方式用的也少。

在hive中如果不指定reduce个数的情况下,Hive会猜测确定一个reduce个数,基于以下两个设定

1.set hive.exec.reducers.bytes.per.reducer=300000000  --我们默认值 300Mb
注意:在hive 0.14.0之前默认hive.exec.reducers.bytes.per.reducer默认值是1Gb,每个reduce最多处理1Gb。
但是在之后版本默认值都是256Mb.这里我们用的是300Mb。为什么300Mb写的是300*1000*1000?因为网络传输中用的1000,而不是1024机制。

2.set hive.exec.reducers.max=1009 --我们默认值,这个值一般不会修改。
注意,在hive 0.14.0之前默认是999,之后是1009,所以我们的也是官方默认值。

3.reduce计算方式:计算reducer数的公式很简单
Num=min(hive.exec.reducers.max2,map输出数据量/hive.exec.reducers.bytes.per.reducer)

案例演示控制reduce个数的方法

  1. 数据准备

    这里文件大表的大小为23.4G,存储为22个文件,平均每个文件大小都在1.1G左右。小表的文件2个,合计58Mb.

    [finance]$ hadoop fs -count hdfs://suninghadoop2/user/finance/hive/warehouse/fdm_tmp.db/company_liquidation_fgeics_company_ar_d
    1 22 25154158871 hdfs://suninghadoop2/user/finance/hive/warehouse/fdm_tmp.db/company_liquidation_fgeics_company_ar_d
    -----------------------------------------------------------------------------------------------------------------------------------
    [finance]$ hadoop fs -du -s -h hdfs://suninghadoop2/user/finance/hive/warehouse/fdm_tmp.db/company_liquidation_fgeics_company_ar_d
    23.4 G hdfs://suninghadoop2/user/finance/hive/warehouse/fdm_tmp.db/company_liquidation_fgeics_company_ar_d
    -----------------------------------------------------------------------------------------------------------------------------------
    [finance]$ hadoop fs -du -h hdfs://suninghadoop2/user/finance/hive/warehouse/fdm_tmp.db/company_liquidation_fgeics_company_ar_d
    1.1 G hdfs://suninghadoop2/user/finance/hive/warehouse/fdm_tmp.db/company_liquidation_fgeics_company_ar_d/000000_0
    1.1 G hdfs://suninghadoop2/user/finance/hive/warehouse/fdm_tmp.db/company_liquidation_fgeics_company_ar_d/000001_0
    ...............................
  2. 通过hive.exec.reducers.bytes.per.reducer控制reduce个数

    使用系统默认的配置参数,系统自动计算reduce个数,这里需要85个reduce。85*300>23.4G。程序运行时间为123S.

    set mapred.max.split.size=256000000  ;--默认值
    set mapred.min.split.size=10000000 ; ----默认值
    set hive.exec.reducers.bytes.per.reducer = 300000000; ----默认值

    drop table IF EXISTS fdm_tmp.company_liquidation_fgeics_company_ar_d_tmp;
    create table fdm_tmp.company_liquidation_fgeics_company_ar_d_tmp
    as
    select
    a.id
    ,a.entid
    ,a.ancheyear
    ,b.liqmen
    ,b.ligprincipal
    ,a.regno
    ,a.tel
    ,a.postalcode
    ,a.dom
    ,a.email
    ,a.busst
    ,a.empnum
    ,a.name
    ,a.updated
    ,b.etl_time
    from fdm_tmp.t_fgeics_company_liquidation_d_tmp b
    right join fdm_tmp.company_liquidation_fgeics_company_ar_d a
    on b.entid = a.entid;

    Hadoop job information for Stage-1: number of mappers: 112; number of reducers: 85
    Time taken: 123.945 seconds

    增大hive.exec.reducers.bytes.per.reducer = 1000000000; 发现需要的reduce变成了26个,减少了一大半。且满足26*1Gb>23.4G。但程序运行时间增加将近一半237s.

    set mapred.max.split.size=256000000;
    set mapred.min.split.size=10000000;
    set hive.exec.reducers.bytes.per.reducer = 1000000000;

    Hadoop job information for Stage-1: number of mappers: 112; number of reducers: 26
    Time taken: 237.104 seconds

    降低hive.exec.reducers.bytes.per.reducer = 128000000; 发现需要的reduce变成了,增加到197个。执行时间为141s.

    set mapred.max.split.size=256000000;
    set mapred.min.split.size=10000000;
    set hive.exec.reducers.bytes.per.reducer = 128000000;
    Hadoop job information for Stage-1: number of mappers: 112; number of reducers: 197
    Time taken: 141.371 seconds
  3. 通过mapred.reduce.tasks直接设置reduce个数(慎用)

    如下,手动设置reduce的个数为100个,则reduce的个数就是100个,每个reduce处理的数据按总数据量 /100.

    set mapred.max.split.size=256000000;
    set mapred.min.split.size=10000000;
    set mapred.reduce.tasks=100;

    Hadoop job information for Stage-1: number of mappers: 112; number of reducers: 100

总结:

同map函数一样,启动和初始化reduce也会消耗时间和资源。所以reduce的个数不宜过多,且reduce的个数决定了最终输出文件的个数,如果reduce个数过多则会产生很多小文件,对于以后的计算也会降低效率。

当然reduce个数如果过少,也会造成单个reduce处理数据量太大也会影响程序的效率。所以一般reduce的个数最好让程序自己去推定与计算。

那么什么时候可以进行手动设定reduce数量呢?比如系统自动计算的reduce个数,因为集群资源不足,造成程序运行出现OOM(内存溢出不足)时,可以根据推定的reduce个数手动增加数量,保证程序在跑的慢的基础上可以完整运行。

reduce的特殊情况,程序只有一个reduce

什么情况下程序只有一个reduce呢?一般map输出的结果值小于hive.exec.reducers.bytes.per.reducer值时,程序只有一个reduce没有问题,其次手动设置mapred.reduce.tasks=1,这个时候程序只有一个reduce。那么正常除了这两种情况下,正常程序还有什么情况下只有 一个reduce呢?

  1. 代码用了order by ,因为order by是全局排序

    drop table IF EXISTS fdm_tmp.company_liquidation_fgeics_company_ar_d_tmp;
    create table fdm_tmp.company_liquidation_fgeics_company_ar_d_tmp
    as
    select
    a.id
    ,a.entid
    ,a.ancheyear
    ,b.liqmen
    ,b.ligprincipal
    ,a.regno
    ,a.tel
    ,a.postalcode
    ,a.dom
    ,a.email
    ,a.busst
    ,a.empnum
    ,a.name
    ,a.updated
    ,b.etl_time
    from fdm_tmp.t_fgeics_company_liquidation_d_tmp b
    right join fdm_tmp.company_liquidation_fgeics_company_ar_d a
    on b.entid = a.entid
    order by a.id ;

    Hadoop job information for Stage-1: number of mappers: 112; number of reducers: 85
    Hadoop job information for Stage-2: number of mappers: 81; number of reducers: 1

    order by 全局排序针对所有的数据,所以最终所有的数据都要分发给一个reduce去处理。比如这里23.4G文件使用order by,系统 启动了两个MR程序执行。最终使用一个reduce进行排序,极度慢,慎用。

  2. 表关联时出现了笛卡尔乘积

    drop table IF EXISTS fdm_tmp.company_liquidation_fgeics_company_ar_d_tmp;
    create table fdm_tmp.company_liquidation_fgeics_company_ar_d_tmp
    as
    select
    a.id
    ,a.entid
    ,a.ancheyear
    ,b.liqmen
    ,b.ligprincipal
    ,a.regno
    ,a.tel
    ,a.postalcode
    ,a.dom
    ,a.email
    ,a.busst
    ,a.empnum
    ,a.name
    ,a.updated
    ,b.etl_time
    from fdm_tmp.t_fgeics_company_liquidation_d_tmp b
    right join fdm_tmp.company_liquidation_fgeics_company_ar_d a
    on True;
    Hadoop job information for Stage-1: number of mappers: 112; number of reducers: 1

    表之间进行join时,如果出现 了笛卡尔乘积,需要全局处理,所以MR不得不使用 一个reduce去处理,所以要尽量避免出现这种 情况。

  3. 使用count要注意

    --1.单独使用count(1),count(*)要比count(id)快。
    因为列的引用加入增加了序列化和反序列化的问题。但是这时候count仍然只有1个reduce.
    add jar /home/bigdata/software/hive/ext-lib/suning-hive-inputformat.jar;
    select count(1)
    from fdm_ssa.t_fgeics_company_ar_d
    WHERE statis_date = '20190718' ;
    Hadoop job information for Stage-1: number of mappers: 248; number of reducers: 1
    OK
    158746942
    Time taken: 46.182 seconds, Fetched: 1 row(s)
    -------------------------------------------------------------------------------

    --2.改写加入group by ,稍微快一些,这时候reduce个数不是1.而是129个。

    add jar /home/bigdata/software/hive/ext-lib/suning-hive-inputformat.jar;
    select
    statis_date,
    count(1)
    from fdm_ssa.t_fgeics_company_ar_d
    WHERE statis_date = '20190718'
    group by statis_date
    ;

    Hadoop job information for Stage-1: number of mappers: 248; number of reducers: 129
    OK
    20190718 158746942
    Time taken: 44.167 seconds, Fetched: 1 row(s)
    ---------------------------------------------------------------------------------
Author: Tunan
Link: http://yerias.github.io/2021/07/04/hive/33/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.