主要内容:

  • 介绍Spark的产生
  • 介绍Spark的基本架构图
  • 介绍Spark的4种提交运行模式
  • 介绍RDD

1、为什么会有Spark?

Spark 之前, 已经有非常成熟的计算框架存在了, 例如 MapReduce。虽然这些框架提供了对访问计算资源的抽象,但是它们缺少了对利用分布式内存的抽象。如MR中数据复用时是将中间数据写到一个稳定的文件系统,如HDFS中, 会产生数据的复制备份、磁盘的I/O、以及数据的序列化等问题。MR执行迭代计算的任务过程:

多个 MR 任务之间没有基于内存的数据共享方式,只能通过磁盘来进行共享。这对于常见的数据复用场景是非常低效的:迭代式计算、交互式数据挖掘、图计算等。而Spark 使用了内存保存中间结果,能在数据尚未写入硬盘时在内存中进行运算。Spark是基于内存的计算框架。学术界为此提出了一个新的模型:RDD。RDD是一个可以容错且并行的数据结构,它可以让用户显式的将中间结果数据集保存在内中,并且通过控制数据集的分区来达到数据存放处理最优化。RDD执行迭代计算的任务过程:

1.1 Spark比MR快的原因?

主要有两个原因:

  • Spark基于内存,MR基于磁盘。
  • Spark基于线程,MR基于进程。MR任务以进程的方式运行在yarn集群中,Spark任务以线程的方式运行在进程中。进程中可以启动很多个线程,而开启一个进程与开启一个线程需要的时间和调度代价是不一样。 开启一个进程需要的时间远远大于开启一个线程。

2、Spark架构图

  • Driver:执行main方法,并创建SparkContext,该对象是所有Spark程序的入口。创建SparkContext的目的是为了准备Spark应用程序的运行环境,在Spark中由SparkContext负责与ClusterManager通信,进行资源申请、任务的分配和监控等,当Executor部分运行完毕后,Driver同时负责将SparkContext关闭。通常用SparkContext代表Driver。
  • ClusterManager:集群管理器。用来提供计算资源的外部服务。
    • Standalone:Master主节点
    • YARN:ResourceManager节点
    • Mesos:类似YARN
  • Worker:集群中的节点。在Standalone模式中指配置的worker节点;在YARN模式中指NodeManager节点。
  • Executor:运行在worker节点上的一个进程,负责运行task。
  • Task:Executor上运行的最小单元。是运行Application的基本单位,多个Task组成一个Stage,而Task的调度和管理等是由TaskScheduler负责。

2.1 Spark中Job、Stage、Task之间的关系

3、Spark的提交运行模式

无论是Standalone,还是Spark on YARN模式,都有Client和Cluster两种方式。

3.1 Standalone-Client模式

Client可以运行在集群的Master节点,也可以运行在本地Client。

在Client端通过spark-submit将任务提交到集群中的Master节点。

Driver运行在Client上,Driver要和集群中的Worker节点通信,也就说Driver不能断开,适合运行交互类任务。

3.2 Standalone-Cluster模式

Client通过spark-submit将任务提交到集群中的Master节点。

Driver运行在集群中某一个Worker节点上。必须将jar包放到hdfs,保证每个worker节点都可以访问到这个jar。Client在提交完任务后可断开,不会影响程序的运行。不适合交互类任务。

3.3 YARN-Client模式

Client通过spark-submit将任务提交到集群中的ResourceManager。任务提交到ResourceManager之后,在某个NodeManager中启动Container运行ApplicationMaster进程。Client模式中的 AppMaster不运行SparkContext,只与SparkContext通信进行资源的派分。

Driver运行在Client,并在Client上创建SparkContext对象。AppMaster需要和Driver进行通信,注册发布自己的位置。所以Driver也不能断开。

3.4 YARN-Cluster模式

Client通过spark-submit将任务提交到集群中的Master节点。任务提交到ResourceManager之后,在某个NodeManager中启动Container运行ApplicationMaster进程,然后在AppMaster中初始化SparkContext对象。

总结:

  • 理解YARN-Client和YARN-Cluster深层次的区别之前先清楚一个概念:Application Master。在YARN中,每个Application实例都有一个ApplicationMaster进程,它是Application启动的第一个容器。它负责和ResourceManager打交道并请求资源,获取资源之后告诉NodeManager为其启动Container。从深层次的含义讲YARN-Cluster和YARN-Client模式的区别其实就是ApplicationMaster进程的区别
  • YARN-Cluster模式下,Driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行,因而YARN-Cluster模式不适合运行交互类型的作业。
  • YARN-Client模式下,Application Master仅仅向YARN请求Executor,Client会和请求的Container通信来调度他们工作,也就是说Client不能离开。

4、RDD

4.1 RDD基本介绍

RDD,全称为 Resilient Distributed Datasets,是一个容错的,并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。RDD本身并不存储数据,只是在计算的时候才读取的。

如果站在RDD设计者的角度上,这个类中至少需要包含以下五大属性:

  • Partition List 可分区:一个RDD有多个分区,每个分区包含部分数据。可分区,意味着可以并行计算,是可分布式计算的基础。
  • Compute Function 计算函数:Spark中RDD的计算是以分区为单位的,每个RDD都会实现计算函数以达到这个目的。为了实现容错,也需要记录RDD之间转换所执行的计算函数。
  • RDD Dependencies 依赖关系:RDD与RDD之间是互相依赖的,可以通过依赖关系和计算函数进行容错。
  • Partitioner 分区器(可选):为了执行Shuffled操作,必须要有一个函数用来计算数据应该发往哪个分区。只有对于key-value的RDD,并且产生shuffle,才会有Partitioner。
  • Preferred Location 数据本地性(可选):Spark任务在调度的时候会优先考虑存有数据的节点开启计算任务,减少数据的网络传输,提升计算效率。

4.2 RDD算子介绍

RDD 还提供了一组丰富的算子来操作这些数据。RDD的算子大致可以分类两类:

  • Transformation-转换操作:在一个已经存在的RDD上创建一个新的RDD。
  • Action-动作操作:执行各个分区的计算任务,将计算结果返回到Driver中。

执行 RDD 的时候,在执行到转换操作的时候,并不会立刻执行获得结果,而是直到遇见了Action操作,才会触发真正的执行,这个特点叫做惰性求值。默认情况下,每一个Action运行的时候,其所关联的所有Transformation RDD都会重新计算,但是也可以使用presist和checkpoint方法将RDD持久化到磁盘或者内存中。

4.3 RDD的分区和Shuffle

RDD分区主要是用来实现并行计算,本质上分区和Shuffle没有什么关系,但是在进行数据处理的时候,有时候需要把Key相同的Value拉到一起进行计算,而这些Key相同的Value可能分布在不同的分区里,甚至可能在不同的节点中, 但是它们必须被共同计算。比如redeceByKey算子,为了让相同的key的所有数据都在reduceByKey的同一个reduce中处理,就需要执行一个all-to-all的操作,需要在不同的节点(或分区)之间拷贝数据,必须跨分区聚集相同Key的所有数据,这个过程就叫Shuffle。Shuffle是Spark对数据进行重分区的机制。所以,理解分区才能更好地理解Shuffle的原理。

Spark的Shuffle大致可以分为两种:

  • Hash Based Shuffle:会产生大量的中间磁盘文件,影响性能。Spark1.2之前使用。
  • Sort Based Shuffle:相比于Hash Based Shuffle,有所改进:每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

4.4 RDD的依赖关系

子RDD和父RDD之间的依赖关系有两种不同的类型:

  • 窄依赖:每一个父RDD的分区最多被子RDD的一个分区使用。不会产生Shuffle。
  • 宽依赖:每一个父RDD的分区被子RDD的多个分区使用。会产生Shuffle。

4.5 RDD缓存和Checkpoint机制

使用缓存的原因:

  • 多次使用RDD
  • 容错

除此之外,使用缓存的理由还有很多。但是总结一句,就是缓存能够帮助开发者在进行一些昂贵操作后,将其结果保存下来,以便下次使用无需再次执行,缓存能够显著的提升性能。

RDD缓存的方法:cache、persist、checkpoint。三者区别:

  • cashe和persist:
    • cache默认数据缓存在内存中。persist可以把数据保存在内存或者磁盘中,有丰富的缓存级别。都存在数据丢失的风险。
    • 要触发 cache 和 persist 持久化操作,需要有一个action操作。
    • 它不会开启其他新的任务,一个action操作就对应一个job 。
    • 它不会改变rdd的依赖关系,程序运行完成后对应的缓存数据就自动消失。
  • checkpoint:
    • 数据存储在分布式文件系统HDFS上。利用了HDFS高可用性,高容错性(多副本)来最大程度保证数据的安全性。
    • 要触发checkpoint持久化操作,需要有一个action操作。
    • 会开启新的job执行checkpoint操作。
    • 它会改变rdd的依赖关系,后续数据丢失了不能够再通过血统进行数据的恢复。
    • 程序运行完成后对应的checkpoint数据就不会消失。

4.6 Spark的任务调度

  • Driver端运行客户端的main方法,构建SparkContext对象,在SparkContext对象内部依次构建DAGScheduler和TaskScheduler。
  • 按照RDD的一系列操作顺序,来生成DAG有向无环图。
  • DAGScheduler拿到DAG有向无环图之后,按照宽依赖进行stage的划分。每一个stage内部有很多可以并行运行的task,最后封装在一个一个的taskSet集合中,然后把taskSet发送给TaskScheduler。
  • TaskScheduler得到taskSet集合之后,依次遍历取出每一个task提交到worker节点上的executor进程中运行。
  • 所有task运行完成,整个任务也就结束了。

为什么要划分Stage?

由于划分完stage之后,在同一个stage中只有窄依赖,没有宽依赖,可以实现流水线计算,stage中的每一个分区对应一个task,在同一个stage中就有很多可以并行运行的task。