Spark、IDEA和Maven的环境准备&Hadoop的依赖以及常用API&WordCount Debug流程&map、reduce方法的参数类型和作用&瘦包在服务器上的jar包依赖

目标

  1. Spark、IDEA和Maven的环境准备
  2. hadoop的依赖以及常用API
  3. WordCount Debug流程
  4. map、reduce方法的参数类型和作用
  5. Writable和WritableComparable的作用
  6. 瘦包在服务器上的jar包依赖

Spark、IDEA和Maven的环境准备

环境:

  1. Spark3.0
  2. IDEA19.3
  3. Maven3.6.3(安装配置阿里云的镜像)

Hadoop的依赖以及常用API

依赖:

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.16.2</version>
</dependency>

常用API:

FileSystem fileSystem; //核心
open() //打开文件返回流
mkdirs() //创建目录
create() //创建文件
copyFromLocalFile() //从本地复制文件到hdfs,类似于get
copyToLocalFile() //从hdfs复制文件到本地,类似于put
listFiles() //列出目录下的所有文件,可以迭代
delete() //删除文件,不存在报错
deleteOnExit() //删除存在的文件,不存在不报错

WordCount Debug流程

  1. 编译

    job.waitForCompletion(true);
  2. 提交

    if (state == JobState.DEFINE) {
    submit();
    }
  3. 兼容新老API

    setUseNewAPI();
  4. 本地连接/服务器连接

    connect();
  5. 检查配置、输出路径等

    final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(),cluster.getClient());
    public JobStatus run() throws IOException, InterruptedException,ClassNotFoundException {
    return submitter.submitJobInternal(Job.this, cluster);
    }
    checkSpecs(job); //validate the jobs output specs
  6. 把该作业的配置信息加到分布式缓存中

    Configuration conf = job.getConfiguration();
    addMRFrameworkToDistributedCache(conf);
  7. 创建该Job对应的存放目录

    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
  8. 拿到该Job对应的ID(local/application)

    JobID jobId = submitClient.getNewJobID();
  9. jobStagingArea/jobid

    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
  10. 拷贝job对应的信息到jobStagingArea/jobid

    copyAndConfigureFiles(job, submitJobDir);
  11. 完成我们输入数据的切片(默认128MB,预留10%浮动空间)

    int maps = writeSplits(job, submitJobDir);
  12. 作业文件提交到指定目录

    writeConf(conf, submitJobFile);
  13. 提交作业

    status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

map、reduce方法的参数类型和作用

  • 继承Mapper后实现map方法

    protected void map(LongWritable key, Text value, Context context)

    该方法中的参数分别是LongWritable key, Text value, Context context

    前两个参数是map方法中输入的键和值,输入的键和值必须是LongWritable类型和Text类型,因为是按offset读取每行本文数据,最后一个参数context,它是MapReduce的上下文。什么都可以拿出来。

  • 继承Reducer后实现reduce方法

    protected void reduce(Text key, Iterable<IntWritable> values, Context context)

    该方法中的参数分别是Text key, Iterable<IntWritable> values, Context context

    前两个参数是reduce方法中输入的键和值,输入的键和值对应map中输出的键值类型,并且值是一个Iterable类型,因为在shuffle阶段相同key的value分到了一起,是一个可迭代的参数。因为是按offset读取每行本文数据,最后一个参数context,它是MapReduce的上下文。什么都可以拿出来。

Writable和WritableComparable的作用

Writable是hadoop中的序列化接口,是一个接口,只定义了两个方法,分别是write()readFields()方法,用于hadoop序列化时的读和写;WritableComparable也是一个序列化接口,只是在序列化的同时同时实现了java中的Comparable<T>接口,具有排序的特性。

hadoop是java写的,那么为什么hadoop要实现自己的序列化接口

  • java序列化数据结果比较大、传输效率比较低、不能跨语言对接

hadoop使用的是RPC协议传送数据,且hadoop是应用在大集群上,所以hadoop的序列化必须做到

  • 占用空间更小
  • 传输速度更快
  • 扩展性更强,支持多种格式的序列化
  • 兼容性更好,需要支持多种语言,如java、scala、python等

所以hadoop实现了自己的序列化接口Writable:压缩速度扩展性兼容性都比java更优秀

另外:

  1. 序列化的对象,他们超越了JVM的生死,不顾生他们的母亲,化作永恒。static和transient修饰的属性除外,因为static修饰的属性是在编译时静态生成的,而对象是动态生成的,又因为transient修饰的属性禁止了属性的序列化。
  2. 把“活的”对象序列化,就是把“活的”对象转化成一串字节,而“反序列化”,就是从一串字节里解析出“活的”对象。

瘦包在服务器上的jar包依赖

打包好的mapreduce程序上传到云主机,由于是瘦包,缺少某些依赖,比如连接mysql的的jar包,现在我们就解决缺少依赖的问题

  1. 将下载好的jar包上传到云主机上

    cp mysql-connector-java-5.1.27-bin.jar ~/lib/
  2. 将jar包加载到hadoop的classpath中

    export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:~/lib/mysql-connector-java-5.1.27-bin.jar
  3. 用hadoop jar 执行jar文件时,加上-libjars参数

    hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.16.2.jar wordcount -libjars /home/hadoop/lib/mysql-connector-java-5.1.27-bin.jar  /1.txt /out

如果上诉方法有问题可以使用hadoop的分布式缓存

  1. 把jar包传到集群上,命令如下

    hadoop fs -put mysql-connector-java-5.1.27.jar /lib
  2. 在mr程序提交job前,添加一下语句:

    job.addArchiveToClassPath(new Path("hdfs://aliyun:9000/lib/mysql-connector-java-5.1.27.jar"));
Author: Tunan
Link: http://yerias.github.io/2018/10/12/hadoop/9/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.