目标
- 数据倾斜
- MRchain解决数据倾斜
- 大小表Reduce Join
- 大小表Map Join
- SQL的执行计划
数据倾斜
数据倾斜怎么造成的
mapreduce计算是将map相同的key丢到reduce,在reduce中进行聚合操作,在map和reduce中间有个shuffle操作,shuffle会将map阶段相同的key划分到reduce阶段中的一个reduce中去,数据倾斜就是数据的key 的分化严重不均,造成一部分数据很多,一部分数据很少的局面。
数据倾斜产生的问题
有一个或多个reduce卡住
各种container报错OOM
读写的数据量极大,至少远远超过其它正常的reduce
伴随着数据倾斜,会出现任务被kill等各种诡异的表现。
原因和解决方法
原因:
- 单个值有大量记录(1.内存的限制存在,2.可能会对集群其他任务的运行产生不稳定的影响)
- 唯一值较多(单个唯一值的记录数占用内存不会超过分配给reduce的内存)
解决办法:
增加reduce个数
使用自定义partitioner
增加reduce 的jvm内存(效果不好)
map 阶段将造成倾斜的key 先分成多组加随机数并且在reduce阶段去除随机数
从业务和数据上解决数据倾斜
我们通过设计的角度尝试解决它
- 数据预处理,过滤掉异常值
- 将数据打散让它的并行度变大,再汇集
平台的优化方法
- join 操作中,使用 map join 在 map 端就先进行 join ,免得到reduce 时卡住
- 能先进行 group 操作的时候先进行 group 操作,把 key 先进行一次 reduce,之后再进行 count 或者 distinct count 操作
- 设置map端输出、中间结果压缩
MRchain解决数据倾斜
核心思想: 第一个mapredue把具有数据倾斜特性的数据加盐(随机数),进行聚合;第二个mapreduce把第一个mapreduce的加盐结果进行去盐,再聚合,问题是两个MR IO高。
参考代码:
package com.tunan.item; |
SQL的执行计划
如何运行SQL的执行计划
EXPLAIN [EXTENDED] Syntax |
EXPLAIN select e.empno,e.ename,d.deptno,d.dname from emp e join dept d on e.deptno = d.deptno; |
解析这句SQL的执行计划
+-----------------------------------------------------------------+--+ |
从执行计划得知,hive中执行SQL语句底层执行的是MapReduce。
我们在SQL中关联了两张表分别是emp dept,并从两张表中取出某些字段,在SQL执行计划中共分为三个阶段,分别是stage4、stage3、stage0。
stage4是根stage,stage3依赖stage4,同时stage0依赖stage3。
stage4是一个本地作业,读取的是dept表,输出一个Map类型的hashTable,关联的key是两张表的deptno,在执行计划中表现为0 deptno和 1 deptno,即执行的MapReduce中的Key是deptno字段。
stage3是MapReduce中的Map阶段,扫描emp表,执行一个Map Join操作,条件是两张表的dept字段相等(内连接),现在我们得到的是一张包含所有字段的大表,得到需要的字段的对应位置,并且匹配字段的类型,在输出的时候检查是否需要压缩,以及输入、输出、和序列化类型
Stage-0阶段取出limit中指定的记录数
总结: 我们发现执行该SQL没有Reduce阶段,在现有的版本中默认设置hive.auto.convert.join
(是否自动转换为mapjoin)为true,该参数配合hive.mapjoin.smalltable.filesize
参数(小表的最大文件大小)默认为25M。即小于25M的表为小表,自动转为mapjoin,小表上传到hadoop缓存,提供给各个大表join使用。大表和小表根据关联的key形成一张大表,取出select需要的字段,最后根据limit设置的值取出对应的记录数。
参考参数:
--是否自动转换为mapjoin |
大小表Reduce Join(emp、dept)
Reduce Join的核心思路是定义输出字段作为一个实体类,用来作为输出,实体类中定义一个标志用来区分表的来源
将大小两个表在SQL中join的字段作为MapReduce中的key,原因是MapReduce中的key具有排序和分区的作用
Map中获取context中切片所在的文件名,按行获取文件中的数据并且根据获取的文件名分别将数据set到对象中,并写出Map。
Reduce中每次获取key相同的一组value值数据,这组value值既有dept中的数据,也
有emp中的数据,只要他们有相同的key,就会在shuffle中丢到一个reduce,这时候获取这组数据的值,根据flag来判断来自哪个表,如果是dept表则将数据设置到新new出来的对象中,添加到List列表中,同时创建一个保存emp表中数据的变量,由于emp表是小表,emp表中需要的数据对应dept/emp中的key的字段是唯一的,所以只需要把value中所有的对象都遍历循环出来,dept表数据添加到了List列表,emp表的数据添加到了变量中,最后循环List列表把变量set到每一个对象中,即完成了全部对象的全部成员属性。最后输出即可。
参考代码:
package com.tunan.hadoop.join; |
大小表Map Join(emp、dept)
Map Join的核心思想是把小表添加到缓存中(Map中),在map中读取大表每行数据时set到对象值时取出小表(Map)对应key的值即可
setup中,通过context获取小表文件切片的路径,然后通过读取流的方式读取为字符,按行获取到字符后切分,使用HashMap结构设置key和value分别为map方法中大表join时需要的键和值。
在map方法中读取文件数据,并且根据key取出HashMap(小表)中的value,一起set到对象中即可,最后写出,写出时,可以把value设置为NullWritable。
参考代码:
package com.tunan.hadoop.join;
import com.tunan.hadoop.pojo.JoinMain;
import com.tunan.hadoop.utils.FileUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.codehaus.groovy.runtime.wrappers.LongWrapper;
import java.io.*;
import java.net.URI;
import java.util.HashMap;
/**
* @description:
* @author: tunan
* @create: 2020-02-01 23:10
* @since: 1.0.0
**/
public class MapJoinDriver extends Configured implements Tool {
private String in = "data/join/emp.txt";
private String out = "out";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
int run = ToolRunner.run(conf, new MapJoinDriver(), args);
System.exit(run);
}
/**
* @param strings
* @return : int
* @describe : 设置配置文件,不用设置reduce
* @author : tunan
* @date : 2020/2/1 23:14
*/
public int run(String[] strings) throws Exception {
Configuration conf = super.getConf();
FileUtil.checkFileIsExists(conf,out);
Job job = Job.getInstance(conf);
job.setJarByClass(MapJoinDriver.class);
job.addCacheFile(new URI("data/join/dept.txt"));
job.setMapperClass(MapperJoin.class);
job.setOutputKeyClass(JoinMain.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, new Path(in));
FileOutputFormat.setOutputPath(job, new Path(out));
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
private static class MapperJoin extends Mapper<LongWritable, Text, JoinMain, NullWritable> {
private HashMap<Integer, String> hashMap = new HashMap<>();
protected void setup(Context context) throws IOException, InterruptedException {
//得到缓存文件路径
//String path = context.getCacheFiles()[0].getPath().toString();
String path = context.getCacheFiles()[0].getPath();
/*URI[] files = context.getCacheFiles(); //URI 通过getPath()解码 没有toString()方法
String s = files[0].getPath();*/
//得到文件
//File file = new File(cacheFiles[0]);
//String path = file.getPath();
//得到文件的流 InputStreamReader将字节转换为字符
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
//读取文件为字符串
String line ;
while(StringUtils.isNotEmpty(line=br.readLine())){
//切分字符串得到字符串数组
String[] split = line.split("\t");
hashMap.put(Integer.parseInt(split[0]),split[1]);
}
IOUtils.closeStream(br);
}
/**
* @describe :
* @author : tunan
* @param key
* @param value
* @param context
* @return : void
* @date : 2020/2/1 23:38
*/
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split("\t");
if (line.length >= 8){
Integer empno = Integer.parseInt(line[0].trim());
String ename = line[1];
Integer deptno = Integer.parseInt(line[line.length-1].trim());
String dname = hashMap.get(deptno);
context.write(new JoinMain(empno,ename,deptno,dname),NullWritable.get());
}
}
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
}
}
}