编译Spark&Idea配置Spark环境&RDD五大特点&Spark参数管理&数据的读写

目录

  1. 编译Spark
  2. Idea配置Spark环境
  3. RDD五大特点
  4. Spark参数管理
  5. 数据的读写

编译Spark

作为一个Spark玩的6的攻城狮,第一步就是要学会如何编译Spark

  1. 下载spark源码: 官网或者github

  2. 查看官网编译文档,切记注意版本号,不同版本号编译方式区别很大

  3. 修改相关配置

    1. 注释掉make-distribution.sh脚本中的128行左右一下,使用固定的版本替代

      VERSION=2.4.5
      SCALA_VERSION=2.12.10
      SPARK_HADOOP_VERSION=2.6.0-cdh5.16.2
      SPARK_HIVE=1

      替代==>

      #VERSION=$("$MVN" help:evaluate -Dexpression=project.version $@ 2>/dev/null\
      # | grep -v "INFO"\
      # | grep -v "WARNING"\
      # | tail -n 1)
      #SCALA_VERSION=$("$MVN" help:evaluate -Dexpression=scala.binary.version $@ 2>/dev/null\
      # | grep -v "INFO"\
      # | grep -v "WARNING"\
      # | tail -n 1)
      #SPARK_HADOOP_VERSION=$("$MVN" help:evaluate -Dexpression=hadoop.version $@ 2>/dev/null\
      # | grep -v "INFO"\
      # | grep -v "WARNING"\
      # | tail -n 1)
      #140 #SPARK_HIVE=$("$MVN" help:evaluate -Dexpression=project.activeProfiles -pl sql/hive $@ 2>/dev/null\
      # | grep -v "INFO"\
      # | grep -v "WARNING"\
      # | fgrep --count "<id>hive</id>";\
      # # Reset exit status to 0, otherwise the script stops here if the last grep finds nothing\
      # # because we use "set -o pipefail"
      # echo -n)
    2. 修改maven仓库地址,在253行左右,pom.xml文件

      Maven Repository
      <!--<url>https://repo.maven.apache.org/maven2</url> -->
      <url>http://maven.aliyun.com/nexus/content/groups/public/</url>

      加上cdh的下载地址
      <repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
      <releases>
      <enabled>true</enabled>
      </releases>
      <snapshots>
      <enabled>false</enabled>
      </snapshots>
      </repository>
  4. 开始编译

    maven编译前执行

    export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=1g"

    make-distribution.sh编译不需要执行,我们这里使用make-distribution.sh编译,它的脚本自动执行了这句

    ./dev/change-scala-version.sh 2.12
    ./dev/make-distribution.sh --name 2.6.0-cdh5.16.2 --tgz -Pyarn -Phive -Phive-thriftserver -Pscala-2.12 -Phadoop-2.6 -Dhadoop.version=2.6.0-cdh5.16.2
  5. 查看生成的jar包

    /home/hadoop/app/spark-2.4.5/spark-2.4.5-bin-2.6.0-cdh5.16.2.tgz

Idea配置Spark环境

  1. idae引入依赖

    <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
    <scala.version>2.12.10</scala.version>
    <scala.tools.version>2.12</scala.tools.version>
    <spark.version>2.4.5</spark.version>
    </properties>
    <dependencies>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>${hadoop.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_${scala.tools.version}</artifactId>
    <version>${spark.version}</version>
    </dependency>

    <dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>${scala.version}</version>
    </dependency>
    </dependencies>
  2. 配置阿里云和cdh的仓库地址

    <repositories>
    <repository>
    <id>cloudera</id>
    <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    <snapshots>
    <enabled>false</enabled>
    </snapshots>
    </repository>
    <repository>
    <id>central</id>
    <name>aliyun maven</name>
    <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    <layout>default</layout>
    <!-- 是否开启发布版构件下载 -->
    <releases>
    <enabled>true</enabled>
    </releases>
    <!-- 是否开启快照版构件下载 -->
    <snapshots>
    <enabled>false</enabled>
    </snapshots>
    </repository>
    </repositories>

RDD五大特点

先看官方介绍:

弹性分布式数据集(RDD),是Spark的基本抽象。表示可以并行操作的不可变的、分区的元素集合。这个类包含所有RDDS上可用的基本操作,如mapfilterfilter

[[org.apache.spark.rdd.PairRDDFunctions]],包含仅在键值对的RDDs上可用的操作,如groupByKeyjoin ;

[[org.apache.spark.rdd.DoubleRDDFunctions]] ,包含仅在双精度的RDDs上可用的操作;

[[org.apache.spark.rdd.SequenceFileRDDFunctions]],包含可在RDDs上使用的操作,这些操作可以保存为序列文件。

所有的操作都可以通过隐式转换的方式在任何正确类型的RDD上自动使用(例如RDD[(Int, Int)]);

RDD: resilient distributed dataset(弹性分布式数据集),

  1. 弹性表示容错

  2. 分布式表示分区

  3. 数据集表示集

每个RDD有五个主要特征:

  1. 一个RDD由很多partition构成(block块对应partition),在spark中,有多少partition就对应有多少个task来执行。
  2. 对RDD做计算,相当于对RDD的每个partition或split做计算
  3. RDD之间有依赖关系,可溯源,容错机制
  4. 如果RDD里面存的数据是key-value形式,则可以进行重新分区
  5. 最优位置计算,也就是数据的本地性,移动计算而不是移动数据。

RDD是一个顶级的抽象类,它有五个抽象方法,分别实现了这五个特性

  1. 分区

    /**
    * Implemented by subclasses to return the set of partitions in this RDD. This method will only
    * be called once, so it is safe to implement a time-consuming computation in it.
    *
    * The partitions in this array must satisfy the following property:
    * `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
    */
    protected def getPartitions: Array[Partition]
  2. 计算

    /**
    * :: DeveloperApi ::
    * Implemented by subclasses to compute a given partition.
    */
    @DeveloperApi
    def compute(split: Partition, context: TaskContext): Iterator[T]
  3. 依赖

    /**
    * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
    * be called once, so it is safe to implement a time-consuming computation in it.
    */
    protected def getDependencies: Seq[Dependency[_]] = deps
  4. 重新分区

    /** Optionally overridden by subclasses to specify how they are partitioned. */
    @transient val partitioner: Option[Partitioner] = None
  5. 最优位置

    /**
    * Optionally overridden by subclasses to specify placement preferences.
    */
    protected def getPreferredLocations(split: Partition): Seq[String] = Nil

Spark参数管理

  1. 如果想要定义自己的参数传递到spark中去,一定要以spark.开头

  2. 如果想要获取spark中的参数的值,使用sc.getConf.get(key)

数据的读写

  1. 读本地数据:
    sc.textFile("file://") 需要添加file://

  2. 读hdfs数据:
    sc.textFile("") 默认读hdfs 不需要加前缀
    注意:textFile() 可以使用通配符匹配目录、指定文件、指定文件夹

  3. wholeTextFiles()

    如果使用的是wholeTextFiles() ,它会返回路径+内容

  4. 写本地

    rdd.saveAsTestFile("path")
  5. 写HDFS需要配置

    System.setProperty("HADOOP_USER_NAME", "hadoop");
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS", "hdfs://192.168.91.10:9000");
    conf.set("dfs.client.use.datanode.hostname", "true");
  6. 压缩写

    saveAsTestFile(out,classOf[BZip2Codec])
  7. 对象写

    case class(name:String,age:Int)
    val p1 = ("张三",18)
    val p2 = ("李四",21)
    parallelize(List(p1,p2)).saveAsObjectFile("out")
  8. 对象读

    sc.objectFile[Persion]("out").collect().foreach(println)
Author: Tunan
Link: http://yerias.github.io/2019/10/01/spark/1/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.