目标
- Spark、IDEA和Maven的环境准备
- hadoop的依赖以及常用API
- WordCount Debug流程
- map、reduce方法的参数类型和作用
- Writable和WritableComparable的作用
- 瘦包在服务器上的jar包依赖
Spark、IDEA和Maven的环境准备
环境:
- Spark3.0
- IDEA19.3
- Maven3.6.3(安装配置阿里云的镜像)
Hadoop的依赖以及常用API
依赖:
<dependency> |
常用API:
FileSystem fileSystem; //核心 |
WordCount Debug流程
编译
job.waitForCompletion(true);
提交
if (state == JobState.DEFINE) {
submit();
}兼容新老API
setUseNewAPI();
本地连接/服务器连接
connect();
检查配置、输出路径等
final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(),cluster.getClient());
public JobStatus run() throws IOException, InterruptedException,ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
checkSpecs(job); //validate the jobs output specs把该作业的配置信息加到分布式缓存中
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);创建该Job对应的存放目录
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
拿到该Job对应的ID(local/application)
JobID jobId = submitClient.getNewJobID();
jobStagingArea/jobid
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
拷贝job对应的信息到jobStagingArea/jobid
copyAndConfigureFiles(job, submitJobDir);
完成我们输入数据的切片(默认128MB,预留10%浮动空间)
int maps = writeSplits(job, submitJobDir);
作业文件提交到指定目录
writeConf(conf, submitJobFile);
提交作业
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
map、reduce方法的参数类型和作用
继承Mapper后实现map方法
protected void map(LongWritable key, Text value, Context context)
该方法中的参数分别是
LongWritable key, Text value, Context context
前两个参数是map方法中输入的键和值,输入的键和值必须是LongWritable类型和Text类型,因为是按offset读取每行本文数据,最后一个参数context,它是MapReduce的上下文。什么都可以拿出来。
继承Reducer后实现reduce方法
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
该方法中的参数分别是
Text key, Iterable<IntWritable> values, Context context
前两个参数是reduce方法中输入的键和值,输入的键和值对应map中输出的键值类型,并且值是一个Iterable类型,因为在shuffle阶段相同key的value分到了一起,是一个可迭代的参数。因为是按offset读取每行本文数据,最后一个参数context,它是MapReduce的上下文。什么都可以拿出来。
Writable和WritableComparable的作用
Writable是hadoop中的序列化接口,是一个接口,只定义了两个方法,分别是write()
和readFields()
方法,用于hadoop序列化时的读和写;WritableComparable
也是一个序列化接口,只是在序列化的同时同时实现了java中的Comparable<T>
接口,具有排序的特性。
hadoop是java写的,那么为什么hadoop要实现自己的序列化接口
- java序列化数据结果比较大、传输效率比较低、不能跨语言对接
hadoop使用的是RPC协议传送数据,且hadoop是应用在大集群上,所以hadoop的序列化必须做到
- 占用空间更小
- 传输速度更快
- 扩展性更强,支持多种格式的序列化
- 兼容性更好,需要支持多种语言,如java、scala、python等
所以hadoop实现了自己的序列化接口Writable:压缩
、速度
、扩展性
、兼容性
都比java更优秀
另外:
- 序列化的对象,他们超越了JVM的生死,不顾生他们的母亲,化作永恒。static和transient修饰的属性除外,因为static修饰的属性是在编译时静态生成的,而对象是动态生成的,又因为transient修饰的属性禁止了属性的序列化。
- 把“活的”对象序列化,就是把“活的”对象转化成一串字节,而“反序列化”,就是从一串字节里解析出“活的”对象。
瘦包在服务器上的jar包依赖
打包好的mapreduce程序上传到云主机,由于是瘦包,缺少某些依赖,比如连接mysql的的jar包,现在我们就解决缺少依赖的问题
将下载好的jar包上传到云主机上
cp mysql-connector-java-5.1.27-bin.jar ~/lib/
将jar包加载到hadoop的classpath中
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:~/lib/mysql-connector-java-5.1.27-bin.jar
用hadoop jar 执行jar文件时,加上-libjars参数
hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.16.2.jar wordcount -libjars /home/hadoop/lib/mysql-connector-java-5.1.27-bin.jar /1.txt /out
如果上诉方法有问题可以使用hadoop的分布式缓存
把jar包传到集群上,命令如下
hadoop fs -put mysql-connector-java-5.1.27.jar /lib
在mr程序提交job前,添加一下语句:
job.addArchiveToClassPath(new Path("hdfs://aliyun:9000/lib/mysql-connector-java-5.1.27.jar"));