数据倾斜&MRchain解决数据倾斜&大小表Reduce Join&大小表Map Join&SQL的执行计划

目标

  1. 数据倾斜
  2. MRchain解决数据倾斜
  3. 大小表Reduce Join
  4. 大小表Map Join
  5. SQL的执行计划

数据倾斜

  1. 数据倾斜怎么造成的

    mapreduce计算是将map相同的key丢到reduce,在reduce中进行聚合操作,在map和reduce中间有个shuffle操作,shuffle会将map阶段相同的key划分到reduce阶段中的一个reduce中去,数据倾斜就是数据的key 的分化严重不均,造成一部分数据很多,一部分数据很少的局面。

  2. 数据倾斜产生的问题

    • 有一个或多个reduce卡住

    • 各种container报错OOM

    • 读写的数据量极大,至少远远超过其它正常的reduce

    • 伴随着数据倾斜,会出现任务被kill等各种诡异的表现。

  3. 原因和解决方法

    原因:

    • 单个值有大量记录(1.内存的限制存在,2.可能会对集群其他任务的运行产生不稳定的影响)
    • 唯一值较多(单个唯一值的记录数占用内存不会超过分配给reduce的内存)

    解决办法:

    • 增加reduce个数

    • 使用自定义partitioner

    • 增加reduce 的jvm内存(效果不好)

    • map 阶段将造成倾斜的key 先分成多组加随机数并且在reduce阶段去除随机数

    • 从业务和数据上解决数据倾斜

      我们通过设计的角度尝试解决它

      • 数据预处理,过滤掉异常值
      • 将数据打散让它的并行度变大,再汇集
    • 平台的优化方法

      • join 操作中,使用 map join 在 map 端就先进行 join ,免得到reduce 时卡住
      • 能先进行 group 操作的时候先进行 group 操作,把 key 先进行一次 reduce,之后再进行 count 或者 distinct count 操作
      • 设置map端输出、中间结果压缩

MRchain解决数据倾斜

核心思想: 第一个mapredue把具有数据倾斜特性的数据加盐(随机数),进行聚合;第二个mapreduce把第一个mapreduce的加盐结果进行去盐,再聚合,问题是两个MR IO高。

参考代码:

package com.tunan.item;

import com.tunan.hadoop.utils.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.Random;

/**
* @description:
* @author: tunan
* @create: 2020-02-04 14:50
* @since: 1.0.0
**/

public class ChainMRDriver extends Configured implements Tool {
private static Random r;
String in = "data/skew/access.txt";
String out1 = "out/mr1";
String out2 = "out/mr2";

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
int run = ToolRunner.run(conf, new ChainMRDriver(), args);
System.exit(run);
}

@Override
public int run(String[] strings) throws Exception {
Configuration conf = super.getConf();
FileUtil.checkFileIsExists(conf, out1);
FileUtil.checkFileIsExists(conf, out2);

Job job1 = Job.getInstance(conf);
Job job2 = Job.getInstance(conf);

job1.setJarByClass(ChainMRDriver.class);
job2.setJarByClass(ChainMRDriver.class);

ChainMapper.addMapper(job1, ChainMRDriver.incRanDomMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, conf);
ChainReducer.setReducer(job1, ChainMRDriver.incRanDomReduver.class, Text.class, IntWritable.class, Text.class, IntWritable.class, conf);

ChainMapper.addMapper(job2, ChainMRDriver.decRanDomMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, conf);
ChainReducer.setReducer(job2, ChainMRDriver.decRanDomReduver.class, Text.class, IntWritable.class, Text.class, IntWritable.class, conf);

FileInputFormat.setInputPaths(job1, new Path(in));
FileOutputFormat.setOutputPath(job1, new Path(out1));
FileInputFormat.setInputPaths(job2, new Path(out1));
FileOutputFormat.setOutputPath(job2, new Path(out2));

//提交job1和job2 job1-->job2 必须按照顺序提交
System.out.println("=============第一阶段==============");
boolean b = job1.waitForCompletion(true);
if (b) {
System.out.println("=============第二阶段==============");
boolean b1 = job2.waitForCompletion(true);
return b1 ? 0 : 1;
}
return 1;
}

public static class incRanDomMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//创建对象
r = new Random();
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//把数据读出来,加盐 www.baidu.com 2
String[] line = value.toString().split("\t");
String incR = r.nextInt(10) +"_"+line[0];
int number = Integer.parseInt(line[1]);
context.write(new Text(incR), new IntWritable(number));
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
if (r != null) {
//回收对象
r = null;
}
}
}

public static class incRanDomReduver extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sums = 0;
for (IntWritable sum : values) {
sums += sum.get();
}
context.write(key, new IntWritable(sums));
}
}

public static class decRanDomMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
//去盐 聚合

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split("\t");
String decWord = line[0].split("_")[1];
context.write(new Text(decWord), new IntWritable(Integer.parseInt(line[1])));
}
}

public static class decRanDomReduver extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sums = 0;
for (IntWritable sum : values) {
sums += sum.get();
}
context.write(key, new IntWritable(sums));
}
}
}

SQL的执行计划

如何运行SQL的执行计划

EXPLAIN [EXTENDED] Syntax
EXPLAIN select e.empno,e.ename,d.deptno,d.dname from emp e join dept d on e.deptno = d.deptno;

解析这句SQL的执行计划

+-----------------------------------------------------------------+--+
Explain
+-----------------------------------------------------------------+--+
STAGE DEPENDENCIES: //阶段性依赖
Stage-4 is a root stage //这是一个根依赖
Stage-3 depends on stages: Stage-4 //Stage-3依赖Stage-4
Stage-0 depends on stages: Stage-3 //Stage-0依赖Stage-3

STAGE PLANS: // 阶段性计划
Stage: Stage-4 //阶段4
Map Reduce Local Work //这是一个本地作业
Alias -> Map Local Tables: // Map本地表的别名为d 即表dept
d
Fetch Operator //抓取
limit: -1 //limit为-1,即把数据全部读出来了
Alias -> Map Local Operator Tree: //Map本地操作树
d
TableScan //表扫描
alias: d //别名d
Statistics: Num rows: 2 Data size: 284 Basic stats: PARTIAL Column stats: NONE //统计
Filter Operator //过滤
predicate: deptno is not null (type: boolean) //断定:deptno字段 不为空
Statistics: Num rows: 1 Data size: 142 Basic stats: COMPLETE Column stats: NONE //统计
HashTable Sink Operator //输出类型为HashTable
keys:
0 deptno (type: int)
1 deptno (type: int)

Stage: Stage-3 //阶段3
Map Reduce
Map Operator Tree: //Map操作树
TableScan //表扫描
alias: e //e表 即emp表
Statistics: Num rows: 6 Data size: 657 Basic stats: COMPLETE Column stats: NONE //统计
Filter Operator //过滤
predicate: deptno is not null (type: boolean) //断定:deptno字段 不为空
Statistics: Num rows: 3 Data size: 328 Basic stats: COMPLETE Column stats: NONE //统计
Map Join Operator // Map Join 操作
condition map: //Map条件
Inner Join 0 to 1
keys:
0 deptno (type: int)
1 deptno (type: int)
outputColumnNames: _col0, _col1, _col11, _col12 //数值字段在表中的位置
Statistics: Num rows: 3 Data size: 360 Basic stats: COMPLETE Column stats: NONE //统计
Select Operator //Select操作
expressions: _col0 (type: int), _col1 (type: string), _col11 (type: int), _col12 (type: string) //表达式
outputColumnNames: _col0, _col1, _col2, _col3 //数值字段在表中的位置
Statistics: Num rows: 3 Data size: 360 Basic stats: COMPLETE Column stats: NONE //统计
File Output Operator //文件输出操作
compressed: false //是否压缩:否
Statistics: Num rows: 3 Data size: 360 Basic stats: COMPLETE Column stats: NONE //统计
table: //表文件的输入、输出、序列化类型
input format: org.apache.hadoop.mapred.TextInputFormat //文件输入格式
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat //文件输出格式
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe //反序列化
Local Work:
Map Reduce Local Work

Stage: Stage-0 //阶段0
Fetch Operator
limit: -1 // limit设置的值
Processor Tree:
ListSink

+-----------------------------------------------------------------+--+

从执行计划得知,hive中执行SQL语句底层执行的是MapReduce。

我们在SQL中关联了两张表分别是emp dept,并从两张表中取出某些字段,在SQL执行计划中共分为三个阶段,分别是stage4、stage3、stage0。

stage4是根stage,stage3依赖stage4,同时stage0依赖stage3。

stage4是一个本地作业,读取的是dept表,输出一个Map类型的hashTable,关联的key是两张表的deptno,在执行计划中表现为0 deptno和 1 deptno,即执行的MapReduce中的Key是deptno字段。

stage3是MapReduce中的Map阶段,扫描emp表,执行一个Map Join操作,条件是两张表的dept字段相等(内连接),现在我们得到的是一张包含所有字段的大表,得到需要的字段的对应位置,并且匹配字段的类型,在输出的时候检查是否需要压缩,以及输入、输出、和序列化类型

Stage-0阶段取出limit中指定的记录数

总结: 我们发现执行该SQL没有Reduce阶段,在现有的版本中默认设置hive.auto.convert.join(是否自动转换为mapjoin)为true,该参数配合hive.mapjoin.smalltable.filesize参数(小表的最大文件大小)默认为25M。即小于25M的表为小表,自动转为mapjoin,小表上传到hadoop缓存,提供给各个大表join使用。大表和小表根据关联的key形成一张大表,取出select需要的字段,最后根据limit设置的值取出对应的记录数。

参考参数:

--是否自动转换为mapjoin
set hive.auto.convert.join = true;
--小表的最大文件大小,默认为25000000,即25M
set hive.mapjoin.smalltable.filesize = 25000000;
--是否将多个mapjoin合并为一个
set hive.auto.convert.join.noconditionaltask = true;
--多个mapjoin转换为1个时,所有小表的文件大小总和的最大值。
set hive.auto.convert.join.noconditionaltask.size = 10000000;

大小表Reduce Join(emp、dept)

Reduce Join的核心思路是定义输出字段作为一个实体类,用来作为输出,实体类中定义一个标志用来区分表的来源

  1. 将大小两个表在SQL中join的字段作为MapReduce中的key,原因是MapReduce中的key具有排序和分区的作用

  2. Map中获取context中切片所在的文件名,按行获取文件中的数据并且根据获取的文件名分别将数据set到对象中,并写出Map。

  3. Reduce中每次获取key相同的一组value值数据,这组value值既有dept中的数据,也

有emp中的数据,只要他们有相同的key,就会在shuffle中丢到一个reduce,这时候获取这组数据的值,根据flag来判断来自哪个表,如果是dept表则将数据设置到新new出来的对象中,添加到List列表中,同时创建一个保存emp表中数据的变量,由于emp表是小表,emp表中需要的数据对应dept/emp中的key的字段是唯一的,所以只需要把value中所有的对象都遍历循环出来,dept表数据添加到了List列表,emp表的数据添加到了变量中,最后循环List列表把变量set到每一个对象中,即完成了全部对象的全部成员属性。最后输出即可。

参考代码:

package com.tunan.hadoop.join;

import com.tunan.hadoop.pojo.JoinMain;
import com.tunan.hadoop.utils.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* @description:
* @author: tunan
* @create: 2020-01-29 16:39
* @since: 1.0.0
**/
public class ReduceJoinDriver extends Configured implements Tool {

String in = "data/join/";
String out = "out/";

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
int run = ToolRunner.run(conf, new ReduceJoinDriver(), args);
System.exit(run);
}

@Override
public int run(String[] strings) throws Exception {
//获得configuration
Configuration conf = super.getConf();

//检查文件夹
FileUtil.checkFileIsExists(conf, out);

//使用新方法这里怎么操作?
Job job = Job.getInstance(conf);

//设置驱动类
job.setJarByClass(ReduceJoinDriver.class);

//设置Map/Reducer类
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);

//设置Map参数类型
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(JoinMain.class);

//job.setNumReduceTasks(3);

//设置Reducer参数类型
job.setOutputKeyClass(JoinMain.class);
job.setOutputValueClass(NullWritable.class);

//设置文件的输入输出
FileInputFormat.setInputPaths(job, new Path(in));
FileOutputFormat.setOutputPath(job, new Path(out));

//提交任务
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}

public static class JoinMapper extends Mapper<LongWritable, Text, IntWritable, JoinMain> {
private String name;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
name = fileSplit.getPath().getName();
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//做一个传入的表的判断
if (name.contains("emp")){ //emp
//读取数据
String[] lines = value.toString().split("\t");
if (lines.length ==8){
//细粒度划分
Integer empno = Integer.parseInt(lines[0].trim());
String ename = lines[1];
Integer deptno = Integer.parseInt(lines[lines.length-1].trim());
//写入数据
context.write(new IntWritable(deptno),new JoinMain(empno,ename,deptno,"",1));
}
}else{ //dept
//读取数据
String[] lines = value.toString().split("\t");
if (lines.length ==3){
int deptno = Integer.parseInt(lines[0].trim());
String dname = lines[1];
context.write(new IntWritable(deptno),new JoinMain(0, "", deptno, dname, 2));
}
}
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
}
}

public static class JoinReducer extends Reducer<IntWritable, JoinMain,JoinMain, NullWritable> {
//核心思路在 每个deptno组 进一次reduce ,前提是map中的key是deptno
@Override
protected void reduce(IntWritable key, Iterable<JoinMain> values, Context context) throws IOException, InterruptedException {
List<JoinMain> list = new ArrayList<>();
String dname="";
// 1.取出map中每行数据,判断flag值
// 2.取出所有的emp中数据放入list中
// 3.取出dept中的dname赋值给变量
// 4.取出属于这个deptno中的所有数据,并给dname赋值
// 5.每条赋值dname的数据写入reduce
for (JoinMain main : values){
// emp表
if (main.getFlag() == 1){
//给emp表全部行重新赋值
JoinMain m = new JoinMain();
m.setDeptno(main.getDeptno());
m.setEmpno(main.getEmpno());
m.setEname(main.getEname());
//写出到list
list.add(m);
}else if (main.getFlag() ==2 ){ //dept
//拿到dept表中的dname
dname = main.getDname();
}}
//循环赋值
for (JoinMain bean : list) {
bean.setDname(dname);
context.write(bean,NullWritable.get());
}
}
}
}

大小表Map Join(emp、dept)

Map Join的核心思想是把小表添加到缓存中(Map中),在map中读取大表每行数据时set到对象值时取出小表(Map)对应key的值即可

  1. setup中,通过context获取小表文件切片的路径,然后通过读取流的方式读取为字符,按行获取到字符后切分,使用HashMap结构设置key和value分别为map方法中大表join时需要的键和值。

  2. 在map方法中读取文件数据,并且根据key取出HashMap(小表)中的value,一起set到对象中即可,最后写出,写出时,可以把value设置为NullWritable。

    参考代码:

    package com.tunan.hadoop.join;

    import com.tunan.hadoop.pojo.JoinMain;
    import com.tunan.hadoop.utils.FileUtil;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.codehaus.groovy.runtime.wrappers.LongWrapper;


    import java.io.*;
    import java.net.URI;
    import java.util.HashMap;

    /**
    * @description:
    * @author: tunan
    * @create: 2020-02-01 23:10
    * @since: 1.0.0
    **/
    public class MapJoinDriver extends Configured implements Tool {

    private String in = "data/join/emp.txt";
    private String out = "out";

    public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    int run = ToolRunner.run(conf, new MapJoinDriver(), args);
    System.exit(run);
    }

    /**
    * @param strings
    * @return : int
    * @describe : 设置配置文件,不用设置reduce
    * @author : tunan
    * @date : 2020/2/1 23:14
    */
    @Override
    public int run(String[] strings) throws Exception {

    Configuration conf = super.getConf();
    FileUtil.checkFileIsExists(conf,out);

    Job job = Job.getInstance(conf);

    job.setJarByClass(MapJoinDriver.class);

    job.addCacheFile(new URI("data/join/dept.txt"));

    job.setMapperClass(MapperJoin.class);

    job.setOutputKeyClass(JoinMain.class);
    job.setOutputValueClass(NullWritable.class);

    job.setNumReduceTasks(0);

    FileInputFormat.setInputPaths(job, new Path(in));
    FileOutputFormat.setOutputPath(job, new Path(out));

    boolean b = job.waitForCompletion(true);
    return b ? 0 : 1;
    }

    private static class MapperJoin extends Mapper<LongWritable, Text, JoinMain, NullWritable> {
    private HashMap<Integer, String> hashMap = new HashMap<>();
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
    //得到缓存文件路径
    //String path = context.getCacheFiles()[0].getPath().toString();
    String path = context.getCacheFiles()[0].getPath();
    /*URI[] files = context.getCacheFiles(); //URI 通过getPath()解码 没有toString()方法
    String s = files[0].getPath();*/
    //得到文件
    //File file = new File(cacheFiles[0]);
    //String path = file.getPath();
    //得到文件的流 InputStreamReader将字节转换为字符
    BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
    //读取文件为字符串
    String line ;
    while(StringUtils.isNotEmpty(line=br.readLine())){
    //切分字符串得到字符串数组
    String[] split = line.split("\t");
    hashMap.put(Integer.parseInt(split[0]),split[1]);
    }
    IOUtils.closeStream(br);
    }

    /**
    * @describe :
    * @author : tunan
    * @param key
    * @param value
    * @param context
    * @return : void
    * @date : 2020/2/1 23:38
    */

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String[] line = value.toString().split("\t");
    if (line.length >= 8){
    Integer empno = Integer.parseInt(line[0].trim());
    String ename = line[1];
    Integer deptno = Integer.parseInt(line[line.length-1].trim());
    String dname = hashMap.get(deptno);
    context.write(new JoinMain(empno,ename,deptno,dname),NullWritable.get());
    }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
    super.cleanup(context);
    }
    }
    }
Author: Tunan
Link: http://yerias.github.io/2018/10/14/hadoop/11/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.