大表Join大表&大表Join小表&group By解决数据倾斜

目录

  1. 大表Join大表
  2. 大表Join小表
  3. group By解决

大表Join大表

思路一:SMBJoin

smb是sort merge bucket操作,首先进行排序,继而合并,然后放到所对应的bucket中去,bucket是hive中和分区表类似的技术,就是按照key进行hash,相同的hash值都放到相同的buck中去。在进行两个表联合的时候。我们首先进行分桶,在join会大幅度的对性能进行优化。也就是说,在进行联合的时候,是bukect中的一小部分和bukect中的一小部分进行联合,table联合都是等值连接,相同的key都放到了同一个bucket中去了,那么在联合的时候就会大幅度的减小无关项的扫描。

  1. 设置参数

    set hive.auto.convert.sortmerge.join=true;
    set hive.optimize.bucketmapjoin = true;
    set hive.optimize.bucketmapjoin.sortedmerge = true;
    set hive.auto.convert.sortmerge.join.noconditionaltask=true;
  2. 两个表的bucket数量相等

  3. Bucket列、Join列、Sort列、Skewed列为相同的字段

  4. 必须是应用在bucket mapjoin 的场景中

  5. 注意点

    hive并不检查两个join的表是否已经做好bucket且sorted,需要用户自己去保证join的表,否则可能数据不正确。有两个办法

    • hive.enforce.sorting 设置为true

    • 手动生成符合条件的数据,通过在sql中用distributed c1 sort by c1 或者 cluster by c1,表创建时必须是CLUSTERED且SORTED,如下

    • 创建Skewed Table提高有一个或多个列有倾斜值的表的性能,例如:

      create table test_smb_2(mid string,age_id string)
      CLUSTERED BY(mid) SORTED BY(mid) INTO 500 BUCKETS
      SKEWED BY(mid) on (001)
  6. 案例

    1. 设置先关参数

      set hive.auto.convert.sortmerge.join=true;
      set hive.optimize.bucketmapjoin = true;
      set hive.optimize.bucketmapjoin.sortedmerge = true;
      set hive.auto.convert.sortmerge.join.noconditionaltask=true;
    2. 创建桶表

      user表

      create table user_info_bucket(userid string,uname string)
      clustered by(userid) into 4 buckets
      SKEWED BY(userid) on (001)
      row format delimited fields terminated by "\t";
      STORED AS DIRECTORIES

      domain表

      create table domain_info_bucket(userid string,domainid string,domain string)
      clustered by (userid) into 4 buckets
      SKEWED BY(userid) on (001)
      row format delimited fields terminated by "\t";
      STORED AS DIRECTORIES
    3. 分别倒入数据

      insert overwrite table user_info_bucket select userid ,uname from user
      insert overwrite table domain_info_bucket select userid ,domainid,domain from doamin
    4. 查询

      select * from user_info_bucket u  join domain_info_bucket d on(u.userid==d.userid)

思路二:一分为二

选择临时表的方式,将数据一分为二,把倾斜的key,和不倾斜的key分开处理,不倾斜的正常join,倾斜的根据情况选择mapjoin或加盐处理,最后结果union all结果

user表为用户基本表,domain为用户访问域名的宽表

注意:我们其实隐含使用到了mapjoin,hive中的参数为set hive.auto.convert.join=true;,自动开启,默认25M,不能超过1G。

  1. 创建中间表

    create table tmp_table(userid string,uname string)
    SKEWED BY(userid) on (001)
    row format delimited fields terminated by "\t";
    STORED AS DIRECTORIES
  2. count(*)出符合倾斜条件的数据存入中间表

    insert overwrite table tmp_table
    select
    d.userid,u.uname
    from
    (select userid
    from
    (select
    userid,
    count(userid) u_cunt

    from
    domain
    group by
    userid) t
    where u_cunt>100) d
    left outer join
    (select
    userid,uname
    from
    user) u
    on d.userid = u.userid;
  3. 一分为二,分别查询中出结果,再union all

    select 
    u1.userid,u1.uname,d1.domain
    from
    (select
    userid,uname
    from
    user) u1
    join
    (select
    d.userid,d.domain
    from
    domain d
    left outer join
    tmp_table t
    on
    d.userid = t.userid
    where
    t.userid is null) d1
    on u1.userid = d1.userid

    union all

    select
    u2.userid,u2.uname,d2.domain
    from
    (select
    userid,uname
    from
    user) u2
    join
    (select
    d.userid,d.domain
    from
    domain d
    left outer join
    tmp_table t
    on
    d.userid = t.userid
    where
    t.userid is not null) d2
    on u2.userid = d2.userid

大表Join小表

思路:MapJoin

大表Join小表很好解决,把小表放进内存,大表再去匹配即可。

思路:

  1. 开启MapJoin

    set hive.auto.convert.join=true;
  2. 调整MapJoin小表大小,默认25M(调整为可以容忍的大小)

    set hive.mapjoin.smalltable.filesize
  3. 如果是MR,小表放进Map,大表进入Mapper匹配Map(使用对象存储结果)

group By解决

思路:加盐去盐

  1. 开启数据倾斜时的负载均衡

    set hive.groupby.skewindata=true
  2. 对groupby的key加盐去盐

    开启上面这个参数,hive会自动拆解成两个MR,加盐去盐,最终输出结果

    MR需要写两个MR,一个对key加盐,一个对key去盐

Author: Tunan
Link: http://yerias.github.io/2018/11/09/hive/10/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.