• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

30-Spark入门:Spark技术栈、分区、系统架构、算子和任务提交方式

武飞扬头像
大数据下的画像人
帮助1

17.1 Spark介绍

17.1.1 什么是Spark

  1. 概念理解

    • 并行计算框架
      • Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。 Spark 是加州大学伯克利分校的AMP实验室所开源的类 Hadoop MapReduce 的通用并行计算框架
    • 任务的中间结果可以缓存在内存中,减少磁盘数据交互
      • Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以缓存在内存中,从而不再需要读写 HDFS ,减少磁盘数据交互,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的算法
    • Spark诞生于2009年美国加州伯克利分校的AMP实验室,基于内存计算的大数据并行计算框架,可用于构建大型的、低延迟的数据分析应用程序
  2. Spark和Hadoop的比较

    学新通

17.1.2 总体技术栈讲解

学新通

Spark 提供了 Sparkcore RDD 、 Spark SQL 、 Spark Streaming 、 Spark MLlib 、 Spark、GraphX等技术组件,可以一站式地完成大数据领域的离线批处理、交互式查询、流式计算、机器学习、图计算等常见的任务。这就是 spark 一站式开发的特点

17.1.3 Spark 与 MR 的区别

  1. MR

    • MR只能做离线计算,如果实现复杂计算逻辑,一个MR搞不定,就需要将多个MR按照先后顺序连成一串,一个MR计算完成后会将计算结果写入到HDFS中,下一个MR将上一个MR的输出作为输入,这样就要频繁读写HDFS,网络IO和磁盘IO会成为性能瓶颈。从而导致效率低下。
  2. Spark

    • spark既可以做离线计算,有可以做实时计算,提供了抽象的数据集(RDD、Dataset、DataFrame、DStream)有高度封装的API,算子丰富,并且使用了更先进的DAG有向无环图调度思想,可以对执行计划优化后在执行,并且可以数据可以cache到内存中进行复用
  3. 归根结底最重要的区别还是在于 下图为 MapReduce 执行任务流程(MR基于磁盘,Spark基于内存)

    • 都是分布式计算框架, Spark 计算中间结果基于内存缓存, MapReduce 基于 HDFS 存储。也正因此,Spark 处理数据的能力一般是 MR 的三到五倍以上, Spark 中除了基于内存计算这一个计算快的原因,
    • 还有 DAG(DAGShecdule) 有向无环图来切分任务的执行先后顺序。

学新通

  • MR的数据读取流程

    学新通

  • Spark的数据读取流程

    学新通

17.1.4 Spark API

多种编程语言的支持: Scala,Java,Python,R,SQL 。

17.1.5 Spark运行模式

  1. Local
    • 多用于本地测试,如在 eclipse , idea 中写程序测试等
  2. Standalone
    • Standalone 是 Spark 自带的一个资源调度框架,它支持完全分布式
    • 由Master负责资源的分配
  3. Yarn
    • Hadoop 生态圈里面的一个资源调度框架, Spark 也是可以基于 Yarn 来计算的
    • 若要基于 Yarn 来进行资源调度,必须实现 AppalicationMaster 接口, Spark 实现了这个接口,所以可以基于 Yarn 来进行资源调度
    • 由Yarn中的ResourceManager负责资源的分配
  4. Mesos
    • 资源调度框架
    • 由Messos中的Messos Master负责资源管理

17.1.6 Spark总结

学新通

17.2 Spark Core

17.2.1 Partition

1. 概念

  1. 分区的原因

    • 单节点处理不了大量的数据
    • Spark RDD 是一种分布式的数据集,由于数据量很大,因此要它被切分并存储在各个结点的分区当中
  2. RDD的理解

    • Spark中,RDD(Resilient Distributed Dataset 弹性分布式数据集),是最基本的抽象数据集,
    • 其中每个RDD由若干个Partition组成,不同的分区可能在集群的不同节点上
    • 多个Partition是并行操作的
    • 一个Partition对应一个Task

    学新通

2. 分区方式

对于Spark Shuffle阶段的理解

  • Spark Shuffle阶段共分为Shuffle Write阶段和Shuffle Read阶段,其中在Shuffle Write阶段中,Shuffle Map Task 对数据进行处理并产生中间数据

  • 然后再根据数据分区方式对中间数据进行分区

  • 最终Shuffle Read阶段中的Shuffle Read Task会拉取Shuffle Write阶段中产生的并已经分好区的中间数据

    学新通

Spark包含两种数据分区方式:HashPartitioner(哈希分区)和RangePartitioner(范围分区)

  1. HashPartitioner分区

    • Hash分区
    • HashPartitioner采用哈希的方式对<Key,Value>键值对数据进行分区
    • 其数据分区规则为 partitionId = Key.hashCode % numPartitions
      • partitionId代表该Key对应的键值对数据应当分配到的Partition标识
      • Key.hashCode表示该Key的哈希值
      • numPartitions表示包含的Partition个数
    • 可能会造成数据倾斜(数据量不均衡)
  2. RangePartitioner分区

    • 范围分区

    • 引入RangePartitioner的原因

      • Spark引入RangePartitioner的目的是为了解决HashPartitioner所带来的分区倾斜问题,也即分区中包含的数据量不均衡问题
    • HashPartitioner数据倾斜产生的原因

      • HashPartitioner采用哈希的方式将同一类型的Key分配到同一个Partition中,当某几种类型数据量较多时,就会造成若干Partition中包含的数据过大
      • 在Job执行过程中,一个Partition对应一个Task,此时就会使得某几个Task运行过慢
    • RangePartitioner基于抽样的思想来对数据进行分区

      学新通

3. HDFS-Block与Spark-Partition

spark本身并没有提供分布式文件系统,因此spark的分析大多依赖于Hadoop的分布式文件系统HDFS

  1. HDFS-Block

    • hdfs中的block是分布式存储的最小单元,类似于盛放文件的盒子,一个文件可能要占多个盒子,但一个盒子里的内容只可能来自同一份文件
  2. Spark-Partition

    • spark中的partition 是弹性分布式数据集RDD的最小单元,RDD是由分布在各个节点上的partition组成的
    • partition 是指的spark在计算过程中,生成的数据在计算空间内最小单元,同一份数据(RDD)的partition 大小不一,数量不定,是根据application里的算子和最初读入的数据分块数量决定的
  3. BLock和Partition的联系

    • Spark从HDFS读入文件的分区数默认等于HDFS文件的块数(blocks),HDFS中的block是分布式存储的最小单元。
    • 如果我们上传一个30GB的非压缩的文件到HDFS,HDFS默认的块容量大小128MB,因此该文件在HDFS上会被分为235块(30GB/128MB);Spark读取SparkContext.textFile()读取该文件,默认分区数等于块数即235
  4. Block和Partition的区别

      Block Partition
    位置 存储空间 计算空间
    大小 固定 不固定
    数据冗余 有冗余的、不会轻易丢失 没有冗余设计、丢失之后重新计算得到

17.2.2 RDD⭐️

RDD(Resilient Distributed Dataset) 弹性分布式数据集,Spark计算流程中的一个算子,类似于Storm中的Bolt,对数据进行计算,得到临时结果(partition)

1. RDD五大属性

  1. RDD是由一系列的partition组成的
  2. RDD中的每一个task运行在自己的Partition上
  3. RDD之间有一系列的依赖关系(依赖其他的RDD)
  4. 分区器是作用在(K,V)格式的RDD上
  5. RDD默认会寻找最佳的计算位置
    • 计算向数据靠拢,尽可能少的进行数据的拉取操作

2. RDD流程图

学新通

  • 理解
    • TextFile方法底层封装的是MR读取文件的方式,读取文件之前先进行split切片,默认split大小是一个block大小
    • RDD 实际上不存储数据,这里方便理解,暂时理解为存储数据
      • 真正存储数据的是partition,RDD不存储数据,RDD就是对这个partition的抽象
    • 什么是 K,V格式的RDD ?
      • 如果 RDD 里面存储的数据都是二元组对象,那么这个 RDD 我们就叫做 K,V格式的RDD
    • 哪里体现 RDD 的弹性(容错)?
      • partition 数量,大小没有限制,体现了 RDD 的弹性。
      • RDD 之间依赖关系,可以基于上一个 RDD 重新计算出 RDD
    • 哪里体现 RDD 的分布式?
      • RDD 是由 Partition 组成, partition 是分布在不同节点上的。
      • RDD 提供计算最佳位置,体现了数据本地化。体现了大数据中“计算移动数据不移动”的理念。

3. Lineage血统

利用内存加快数据加载,在其它的In-Memory类数据库或Cache类系统中也有实现。Spark的主要区别在于它采用血统(Lineage)来时实现分布式运算环境下的数据容错性(节点失效、数据丢失)问题

RDD 的最重要的特性之一就是血缘关系(Lineage ),它描述了一个 RDD 是如何从父 RDD 计算得来的。如果某个 RDD 丢失了,则可以根据血缘关系,从父 RDD 计算得来

当这个RDD的部分分区数据丢失时,它可以通过Lineage找到丢失的父RDD的分区进行局部计算来恢复丢失的数据,这样可以节省资源提高运行效率

17.2.3 系统架构

系统架构图一:

学新通

系统架构图二:

学新通

1. Master (Standalone)

  • 作用
    • 资源管理的主节点(进程)
    • Master是Standalone资源调度框架里面资源管理的主节点。也是JVM进程
    • 接受用户的请求

2. Cluster Manager

  • 作用

    • 在集群上获取资源的外部服务
    • 例如:standalone ; yarn ; mesos
  • 在Standalone模式下

    • Cluster Manager是Master(主节点),控制整个集群,监控Worker
  • 在Yarn模式下

    • Cluster Manager是资源管理者

3. Worker计算节点(Standalone)

  1. 理解
    • Worker是Standalone资源调度框架里面资源管理的从节点。也是JVM进程。
    • 资源管理的从节点(进程),或者说是管理本机资源的进程
  2. 作用
    • Standalone模式
      • 资源管理的从节点,负责控制计算节点,启动Executor
    • Yarn模式
      • 指的是NodeManager节点

4. Application

  • 概念理解
    • 基于Spark的用户程序,包含driver程序和运行在集群上的executor程序,即一个完整的spark应用

5. Driver(program)

  • 作用
    • 用来连接工作进程(worker)的程序
  • 概念理解
    • 驱动程序,Application中的main函数并创建SparkContext

6. Executor

  • 概念理解
    • 是在一个worker进程所管理的节点上为某个Application启动的一个个进程,
  • 作用
    • 这个进程负责运行任务,并且负责将数据存在内存或者磁盘上,每个应用之间都有各自独立的executor

7. Task

  • 概念理解
    • 被发送到executor上的工作单元

8. Job

  • 概念理解
    • 包含很多任务(Task) 的组成的并行计算,往往由Spark Action触发生成,一个Application中往往会产生多个Job

9. Stage

  • 概念理解
    • 一个job会被拆分成很多组任务(Task),每组任务(Task)被称为Stage
    • 就像MapReduce分为MapTask和ReduceTask一样
Spark代码流程
创建SparkConf对象
可以设置Application name。
可以设置运行模式及资源需求。
创建SparkContext对象
基于Spark的上下文创建一个RDD,对RDD进行处理。
应用程序中要有Action类算子来触发Transformation类算子执行。
关闭Spark上下文对象SparkContext。

17.2.4 Spark代码流程

  1. 创建SparkConf对象

    • 可以设置Application name。
    • 可以设置运行模式及资源需求。
    val sparkConf: SparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    
  2. 创建SparkContext对象

    val sparkContext: SparkContext = new SparkContext(sparkConf)
    
  3. 基于Spark的上下文创建一个RDD,对RDD进行处理。

    val value: RDD[(String,Int)] = sparkContext.textFile("src/main/resources/user.log")
    
  4. 应用程序中要有Action类算子来触发Transformation类算子执行。

    println(lines.count())
        lines.foreach(ele => println("foreach:"   ele))
        lines.take(5).foreach(ele => println("take:"   ele))
        println(lines.first())
        lines.collect().foreach(ele => println("collect:"   ele))
    
  5. 关闭Spark上下文对象SparkContext。

    sparkContext.stop()
    

17.3 算子(单文件)⭐️

什么是算子?
可以理解成spark RDD的方法,这些方法作用于RDD的每一个partition。因为spark的RDD是一个 lazy的计算过程,只有得到特定触发才会进行计算,否则不会产生任何结果

Spark 记录了RDD之间的生成和依赖关系,但是只有当F进行行动操作时,Spark才会根据RDD的依赖关系生成DAG,并从起点开始真正的计算,如下图的A、B、C、D分别是一个个RDD

学新通

学新通

17.3.1 转换算子

  1. 概念理解
    • Transformations 类算子叫做转换算子(本质就是函数),Transformations算子是延迟执行,也叫 懒加载 执行
  2. 常见的Transformations 类算子
    • filter: 过滤符合条件的记录数,true保留,false过滤掉
    • map:将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素
      • 特点:输入一条数据,输出一条数据
    • faltMap:先map后flat,与map类似,每个输入项可以映射0到多个输出项
    • sample:随机抽样算子,根据传进去的小数按比例进行有放回或者无放回的抽样
    • reduceByKey:将相同的key根据相应的逻辑进行处理
    • sortByKey/sortBy:作用在k,v格式的RDD上,对key进行升序或者降序排序

17.3.2 行动算子

  1. 概念理解
    • Action类算子叫做行动算子,Action类算子是触发执行
    • 一个Application应用程序中有几个Action类算子执行,就有几个job运行
  2. 常见Action类算子
    • count:返回数据集中的元素数,会在结果计算完成后回收到Driver端
    • take(n) :返回一个包含数据集前n个元素的集合
    • first:效果等同于 take(1) ,返回数据集中的第一个元素
    • foreach :循环遍历数据集中的每个元素,运行相应的逻辑
    • collect :将计算结果回收到 Driver 端

17.3.3 控制算子

  • 概念理解
    • 将RDD持久化,持久化的单位是Partition
    • 控制算子有3种,cache、persist、checkpoint,其中cache和persist都是 懒执行 的,必须有一个Action类算子来触发执行
    • checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系

1. cache

  • 默认将RDD的数据持久化到内存中,cache是 懒执行

    • cache() = persist() = persist(StorageLevel.Memory_Only)
  • rdd.cache().count() 返回的不是持久化的RDD,而是一个数值

    /**
    * 第一次并不会使用到缓存数据,因为是懒执行所以等到第一次Action才会开始缓存数据
    * 第二次用到的就是缓存数据
    */
    object Hello04Cache {
      def main(args: Array[String]): Unit = {
        //1.配置并创建对象
        val sparkContext = new SparkContext((new SparkConf().setMaster("local").setAppName("Hello04Cache"   System.currentTimeMillis())))
        //2.开始读取数据
        var lines: RDD[String] = sparkContext.textFile("src/main/resources/NASA_access_log_Aug95")
        //3.开始进行缓存
        lines = lines.cache()
        //4.开始进行计算
        val startTime = System.currentTimeMillis
        val count = lines.count
        val endTime = System.currentTimeMillis
        System.out.println("第一次共"   count   "条数据,"   "计算时间="   (endTime - startTime))
        val cacheStartTime = System.currentTimeMillis
        val cacheResult = lines.count
        val cacheEndTime = System.currentTimeMillis
        System.out.println("第二次共"   cacheResult   "条数据,"   "计算时间="   (cacheEndTime - cacheStartTime))
        //关闭sparkContext
        sparkContext.stop()
    
      }
    }
    
    学新通

2. persist

  1. 特点:

    • 可以指定持久化的级别,最常用的是 MEMORY_ONLY 和 MEMORY_AND_DISK。
  2. 持久化级别

    1. MEMORY_ONLY
    • 使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。
    1. MEMORY_AND_DISK
    • 使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。
    1. MEMORY_ONLY_SER
    • 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
    1. MEMORY_AND_DISK_SER
    • 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。
    1. DISK_ONLY
    • 使用未序列化的Java对象格式,将数据全部写入磁盘文件中。
    1. MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等。
  3. 代码实现

    object Hello05Persist {
      def main(args: Array[String]): Unit = {
        //1.配置并创建对象
        val sparkContext = new SparkContext((new SparkConf().setMaster("local").setAppName("Hello05Persist"   System.currentTimeMillis())))
        //2.开始读取数据
        var lines: RDD[String] = sparkContext.textFile("src/main/resources/NASA_access_log_Aug95")
        //3.开始进行缓存
        lines = lines.persist(StorageLevel.DISK_ONLY)
        //4.开始进行计算
        val startTime = System.currentTimeMillis
        val count = lines.count
        val endTime = System.currentTimeMillis
        System.out.println("第一次共"   count   "条数据,"   "计算时间="   (endTime - startTime))
        val cacheStartTime = System.currentTimeMillis
        val cacheResult = lines.count
        val cacheEndTime = System.currentTimeMillis
        System.out.println("第二次共"   cacheResult   "条数据,"   "计算时间="   (cacheEndTime - cacheStartTime))
        //关闭sparkContext
        sparkContext.stop()
      }
    }
    
    学新通

3. checkpoint

  1. 特点

    • checkpoint将RDD持久化到磁盘,还可以切断 RDD 之间的依赖关系,也是 懒执行
    • 当RDD使用cache机制从内存中读取数据,如果数据没有读到,会使用checkpoint机制读取数据。此时如果没有checkpoint机制,那么就需要找到父RDD重新计算数据了,因此checkpoint是个很重要的容错机制
    • checkpoint就是对于一个RDD chain(链)如果后面需要反复使用某些中间结果RDD,可能因为一些故障导致该中间数据丢失,那么就可以针对该RDD启动checkpoint机制
  2. 执行原理

    • 当RDD的 job 执行完毕之后,会从finalRDD从后往前回溯
    • 当回溯到某一个 RDD 调用了 checkpoint 方法,会对当前的RDD做一个标记
    • Spark 框架会自动启动一个新的 job ,从头开始重新计算这个 RDD 的数据,并将计算出的数据持久化到Checkpoint目录中
      • 以便下次可以快速访问到被标记了checkpoint的RDD,切断了RDD之间的依赖性
  3. 使用Checkpoint时常用的优化手段

    • 对RDD执行Checkpoint之前,最好对这个RDD先执行cache
    • 这样新启动的 job 只需要将内存中的数据拷贝到Checkpoint目录中就可以,省去了重新计算这一步
  4. 代码实现

    def main(args: Array[String]): Unit = {
        val sparkConf = new
        SparkConf().setMaster("local").setAppName("SparkCheckPoint"   System.currentTimeMillis())
        val sparkContext = new SparkContext(sparkConf)
        sparkContext.setCheckpointDir("./checkpoint")
        val lines: RDD[String] =
        sparkContext.textFile("src/main/resources/NASA_access_log_Aug95")
        val words: RDD[String] = lines.flatMap(_.split(" "))
        println("words"   words.getNumPartitions)
        words.checkpoint
        words.count
        sparkContext.stop
    }
    

17.4 Spark集群搭建

17.4.1 安装环境检测

  1. 搭建之前确认对应的 java 版本为8版本

  2. 搭建之前确认对应的 scala 版本为2.12.x版本。

    • [root@node01 ~]# rpm -ivh scala-2.12.11.rpm

    • [root@node01 ~]# whereis scala

    • [root@node01 ~]# vim /etc/profile

      export SCALA_HOME=/usr/share/scala
      export PATH=$SCALA_HOME/bin:$PATH
      
    • [root@node01 ~]# source /etc/profile

    • 三台计算机Node01 Node02 Node03都需要安装Scala

17.4.2 standalone(Single)

启动spark的集群

[root@node01 ~]# cd /opt/yjx/spark-2.4.6/sbin/
[root@node01 sbin]# ./start-all.sh

访问

http://192.168.88.101:8080/

运行案例

spark-submit --master spark://node01:7077 --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10

17.4.3 standalone(HA)

  • 启动集群
    • Zookeeper :
      • 【123】zkServer.sh start
    • 主节点:
      • [root@node01 ~]# cd /opt/yjx/spark-2.4.6/sbin/
      • [root@node01 sbin]# ./start-all.sh
    • 备用节点
      • [root@node02 ~]# cd /opt/yjx/spark-2.4.6/sbin/
      • [root@node02 sbin]# ./start-master.sh

17.4.4 standalone(UI)

17.4.5 SparkShell

17.4.6 yarn模式

  • 启动集群

    • 启动Zookeeper:
      • 【123】zkServer.sh start
    • 启动Hadoop :
      • [root@node01 ~]# start-all.sh
    • 启动Spark:
      • [root@node01 ~]# cd /opt/yjx/spark-2.4.6/sbin/ [root@node01 sbin]# ./start-all.sh
  • 访问

    • spark: http://192.168.88.101:8080/
      hdfs: http://192.168.88.101:9870/
      yarn: http://192.168.88.101:8088
  • 提交任务

    spark-submit --master yarn --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
    

17.5 任务提交方式⭐️

Standalon 和 Yarn 的任务提交方式图解

学新通

17.5.1 Standalone-client

  1. 提交命令

    spark-submit --master spark://node01:7077 --deploy-mode client --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
    
  2. 执行流程

    学新通

    • client模式提交任务后,会在客户端启动Driver进程,来进行任务调度
    • Driver会向Master申请启动Application启动的资源,资源申请成功后
    • Driver端将task分发到worker端执行,启动executor进程(任务的分发)
    • worker端(executor进程)将task执行结果返回到Driver端(任务结果的回收)
  3. 总结

    • client模式适用于测试调试程序,Driver进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。在Driver端可以看到task执行的情况
    • 生产环境下不能使用client 模式,是因为:假设要提交100个 application 到集群运行,Driver 每次都会在 client 端(单个节点)启动,那么就会导致客户端100次网卡流量暴增的问题

17.5.2 Standalone-cluster

  1. 提交命令

    spark-submit --master spark://node01:7077 --deploy-mode cluster --classorg.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
    
  2. 执行流程

    学新通

    • cluster模式提交应用程序后,会向Master请求启动Driver
    • Master接受请求后,随机在集群中的一台节点来启动Driver进程
    • Driver启动后为当前应用程序申请资源
    • Driver端发送task到worker节点上执行(任务的分发)
    • Worker上的executor进程将执行情况和执行结果返回给Driver端(任务结果的回收)
  3. 总结

    • Standalone-cluster 提交方式,应用程序使用的所有 jar 包和文件,必须保证所有的worker 节点都要有,因为此种方式, spark 不会自动上传包
    • 两种保证所有的Worker节点都有应用程序所需的jar包和文件
      • 将所有的依赖包和文件打到同一个包中,然后放在 hdfs 上。
      • 将所有的依赖包和文件各放一份在 worker 节点上

17.5.3 yarn-client

  1. 提交命令

    spark-submit --master yarn --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
    
    spark-submit --master yarn–client --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
    
    spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
    
  2. 执行流程

    学新通

    版本一:

    • 客户端提交一个Application,在客户端启动一个Driver进程
    • 应用程序启动后会向RS(ResourceManager)(相当于Standalone模式下的master进程)发送请求
    • RS收到请求后,随机选择一台NM(NodeManager)启动AM
      • 这里的NM相当于Standalone中的Worker进程
    • AM启动后,会向RS请求一批container资源,用于启动Executor
    • RS会找到一批NM(包含container)返回给AM,用于启动Executor
    • AM会向NM发送命令启动Executor
    • Executor启动后,会 反向注册 给Driver,Driver发送task到Executor,执行情况和结果返回个Driver端

    版本二:

    • 在YARN Cluster模式下,任务提交后会和ResourceManager通讯申请启动 ApplicationMaster, 随后 ResourceManager 分配 container,在合适的 NodeManager上启动 ApplicationMaster,此时的 ApplicationMaster 就是 Driver
    • Driver 启动后向 ResourceManager 申请 Executor内存,ResourceManager接到ApplicationMaster 的资源申请后会分配 container,然后在合适的 NodeManager 上启动 Executor 进程,Executor 进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数,之后执行到 Action 算子时,触发一个 job,并根据宽依赖开始划分 stage,每个stage生成对应的taskSet,之后将 task 分发到各个Executor上执行

    版本三:

    1. 在client端启动Driver进程,初始化作业,解析程序,初始化两个类:DAGScheduler,TaskScheduler.
      – 初始化作业: 判断路径是否存在,权限校验等
      – DAGScheduler将程序的执行流程解析成DAG图,并划分阶段,根据阶段内的分区初始化Task
      – TaskScheduler接收Task,等待分配Task给executor
    2. Driver会向ResourceManager,申请资源,想要启动该应用程序的AppMaster
    3. ResourceManager会分配一个节点,来运行AppMaster,由NodeManager负责真正分配资源运行AppMaster
    4. AppMaster会向ResourceManager申请整个程序所需要的其他资源,准备运行executor进程
    5. 在各个节点上运行的executor会向Driver进行反向注册,要求分配任务
    6. TaskScheduler将Task分配到不同的executor,并监控实时状态,executor开始执行任务,
    7. TaskScheduler收到executor执行完的信息后,表示整个应用程序完成,会向ResouceManager申请注销
  3. 总结

    • Yarn-client 模式同样是适用于测试,因为 Driver 运行在本地, Driver 会与 yarn 集群中的 Executor 进行大量的通信
    • ApplicationMaster (executorLauncher)的在此模式中的作用:
      • 为当前的 Application 申请资源
      • 给 NodeManager 发送消息启动 Executor
      • 注意: ApplicationMaster 在此种模式下没有作业调度的功能

17.5.4 yarn-cluster

  1. 提交命令

    spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
    
    spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi $SPARK_HOME/examples/jars/spark-examples_2.12-2.4.6.jar 10
    
  2. 执行流程

    学新通

    版本一:

    • 客户机提交Application应用程序,发送请求到RS(ApplicationMaster),请求启动AM(ApplicationMaster)
    • RS收到请求后随机在一台NM(NodeManager)上启动AM(相当于Driver端)
    • ApplicationMaster启动后,ApplicationMaster发送请求到RS,请求一批container用于启动Executor
    • ApplicationMaster返回一批NM节点给ApplicationMaster
    • ApplicationMaster连接到NM,发送请求到NM启动Executor
    • Executor反向注册到AM所在的节点的Driver,Driver发送task到Executor

    版本二:

    1. client会首先向ResourceManager申请资源,要求启动AppMaster进程
    2. ResouceManager会分配一个节点,由NodeManager来运行AppMaster,并在AppMaster所在节点运行Driver进程
      Driver进程的作用:初始化作业,解析程序,初始化两个DAGScheduler,TaskScheduler.
      – 初始化作业: 判断路径是否存在,权限校验等
      – DAGScheduler将程序的执行流程解析成DAG图,并划分阶段,根据阶段内的分区初始化Task
      – TaskScheduler接收Task,等待分配Task给executor
    3. AppMaster会向ResourceManager申请整个程序所需要的其他资源,准备运行executor进程
    4. 在各个节点上运行的executor会向Driver进行反向注册,要求分配任务
    5. TaskScheduler将Task分配到不同的executor,并监控实时状态,executor开始执行任务,
    6. TaskScheduler收到executor执行完的信息后,表示整个应用程序完成,会向ResouceManager申请注销
  3. 总结

    • Yarn-Cluster主要用于生产环境中,因为 Driver 运行在 Yarn 集群中某一台 NodeManager中,每次提交任务的 Driver 所在的机器都不再是提交任务的客户端节点,而是多个 NM 节点中的一台,不会产生某一台机器网卡流量激增的现象,但同样也有缺点,任务提交后不能看到日志。只能通过 yarn 查看日志
    • ApplicationMaster 在此模式中的的作用:
      • 为当前的 Application 申请资源
      • 给 NodeManger 发送消息启动 Executor
      • 任务调度

17.5.5 Standalone和Yarn的对比

  1. 相同点

    • standalone是spark自身携带的资源管理框架
    • yarn是hadoop中的资源管理框架。
    • 都是对核心和内存进行管理和分配。
  2. 作业方式不同

    • spark 的standalone模式使用的是spark自身的集群管理器

    • yarn模式是将spark作业运行在yarn上。

  3. 多用户支持不同

    • standalone对于多用户多application支持的不好,只能支持fifo模式进行资源调度,先来的任务先执行,后来的任务就后执行。也可以通过配置,让先来的任务不占用所有资源,给后来的任务留点资源

    • yarn模式对于多用户多任务支持比较好,arn中有fifo调度器,容量调度器,公平调度器这三种资源分配策略,可以动态实现资源的扩缩,更灵活,更重

  4. Yarn和Spark的Standalone调度模式对比

    Yarn Standalone 节点功能
    ResouceManager Master 管理子节点、资源调度、接收任务请求
    NodeManger Worker 管理当前节点,并管理子进程
    YarnChild Executor 运行真正的计算逻辑的(Task)
    Client Client 提交app,管理该任务的Executor
    ApplicaitonMaster ApplicaitonMaster 管理任务,包含driver程序和运行在集群上的executor程序
    • Spark-Standalone
      • client:提交任务的节点就是Drive
      • Cluster:由集群选择一个节点作为Driver
    • Spark-Yarn
      • Client:提交的节点就是Driver,本身还是AM,但任务调度归属于Driver
      • Cluster:Driver和AM二合一,AM是负责任务调度的

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfjihkf
系列文章
更多 icon
同类精品
更多 icon
继续加载