目录
- 编译Spark
- Idea配置Spark环境
- RDD五大特点
- Spark参数管理
- 数据的读写
编译Spark
作为一个Spark玩的6的攻城狮,第一步就是要学会如何编译Spark
查看官网编译文档,切记注意版本号,不同版本号编译方式区别很大
修改相关配置
注释掉make-distribution.sh脚本中的128行左右一下,使用固定的版本替代
VERSION=2.4.5
SCALA_VERSION=2.12.10
SPARK_HADOOP_VERSION=2.6.0-cdh5.16.2
SPARK_HIVE=1替代==>
#VERSION=$("$MVN" help:evaluate -Dexpression=project.version $@ 2>/dev/null\
# | grep -v "INFO"\
# | grep -v "WARNING"\
# | tail -n 1)
#SCALA_VERSION=$("$MVN" help:evaluate -Dexpression=scala.binary.version $@ 2>/dev/null\
# | grep -v "INFO"\
# | grep -v "WARNING"\
# | tail -n 1)
#SPARK_HADOOP_VERSION=$("$MVN" help:evaluate -Dexpression=hadoop.version $@ 2>/dev/null\
# | grep -v "INFO"\
# | grep -v "WARNING"\
# | tail -n 1)
#140 #SPARK_HIVE=$("$MVN" help:evaluate -Dexpression=project.activeProfiles -pl sql/hive $@ 2>/dev/null\
# | grep -v "INFO"\
# | grep -v "WARNING"\
# | fgrep --count "<id>hive</id>";\
# # Reset exit status to 0, otherwise the script stops here if the last grep finds nothing\
# # because we use "set -o pipefail"
# echo -n)修改maven仓库地址,在253行左右,pom.xml文件
Maven Repository
<!--<url>https://repo.maven.apache.org/maven2</url> -->
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
加上cdh的下载地址
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
开始编译
maven编译前执行
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=1g"
make-distribution.sh编译不需要执行,我们这里使用make-distribution.sh编译,它的脚本自动执行了这句
./dev/change-scala-version.sh 2.12
./dev/make-distribution.sh --name 2.6.0-cdh5.16.2 --tgz -Pyarn -Phive -Phive-thriftserver -Pscala-2.12 -Phadoop-2.6 -Dhadoop.version=2.6.0-cdh5.16.2查看生成的jar包
/home/hadoop/app/spark-2.4.5/spark-2.4.5-bin-2.6.0-cdh5.16.2.tgz
Idea配置Spark环境
idae引入依赖
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
<scala.version>2.12.10</scala.version>
<scala.tools.version>2.12</scala.tools.version>
<spark.version>2.4.5</spark.version>
</properties><dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
</dependencies>配置阿里云和cdh的仓库地址
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>central</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<layout>default</layout>
<!-- 是否开启发布版构件下载 -->
<releases>
<enabled>true</enabled>
</releases>
<!-- 是否开启快照版构件下载 -->
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
RDD五大特点
先看官方介绍:
弹性分布式数据集(RDD),是Spark的基本抽象。表示可以并行操作的不可变的、分区的元素集合。这个类包含所有RDDS上可用的基本操作,如map
、filter
和filter
。
[[org.apache.spark.rdd.PairRDDFunctions]]
,包含仅在键值对的RDDs上可用的操作,如groupByKey
和join
;
[[org.apache.spark.rdd.DoubleRDDFunctions]]
,包含仅在双精度的RDDs上可用的操作;
[[org.apache.spark.rdd.SequenceFileRDDFunctions]]
,包含可在RDDs上使用的操作,这些操作可以保存为序列文件。
所有的操作都可以通过隐式转换的方式在任何正确类型的RDD上自动使用(例如RDD[(Int, Int)]);
RDD: resilient distributed dataset(弹性分布式数据集),
弹性表示容错
分布式表示分区
数据集表示集
每个RDD有五个主要特征:
- 一个RDD由很多partition构成(block块对应partition),在spark中,有多少partition就对应有多少个task来执行。
- 对RDD做计算,相当于对RDD的每个partition或split做计算
- RDD之间有依赖关系,可溯源,容错机制
- 如果RDD里面存的数据是key-value形式,则可以进行重新分区
- 最优位置计算,也就是数据的本地性,移动计算而不是移动数据。
RDD是一个顶级的抽象类,它有五个抽象方法,分别实现了这五个特性
分区
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*
* The partitions in this array must satisfy the following property:
* `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
*/
protected def getPartitions: Array[Partition]计算
/**
* :: DeveloperApi ::
* Implemented by subclasses to compute a given partition.
*/
def compute(split: Partition, context: TaskContext): Iterator[T]依赖
/**
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getDependencies: Seq[Dependency[_]] = deps重新分区
/** Optionally overridden by subclasses to specify how they are partitioned. */
val partitioner: Option[Partitioner] = None最优位置
/**
* Optionally overridden by subclasses to specify placement preferences.
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
Spark参数管理
如果想要定义自己的参数传递到spark中去,一定要以
spark.
开头如果想要获取spark中的参数的值,使用
sc.getConf.get(key)
数据的读写
读本地数据:
sc.textFile("file://")
需要添加file://
读hdfs数据:
sc.textFile("")
默认读hdfs 不需要加前缀
注意:textFile() 可以使用通配符匹配目录、指定文件、指定文件夹wholeTextFiles()
如果使用的是wholeTextFiles() ,它会返回路径+内容
写本地
rdd.saveAsTestFile("path")
写HDFS需要配置
System.setProperty("HADOOP_USER_NAME", "hadoop");
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.91.10:9000");
conf.set("dfs.client.use.datanode.hostname", "true");压缩写
saveAsTestFile(out,classOf[BZip2Codec])
对象写
case class(name:String,age:Int)
val p1 = ("张三",18)
val p2 = ("李四",21)
parallelize(List(p1,p2)).saveAsObjectFile("out")对象读
sc.objectFile[Persion]("out").collect().foreach(println)