• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

spark源码跟踪(七)缓存以和检查点机制源码跟踪

武飞扬头像
cangchen@csdn
帮助1

一,程序入口

1.1,测试代码

def main(args: Array[String]): Unit = {
    log.info("-------begin---------")
    val sparkConnf=new SparkConf().setAppName("persistTest").setMaster("local[3]")
    val sparkContext=new SparkContext(sparkConnf)
    sparkContext.setCheckpointDir("./")
    val rdd = sparkContext.parallelize(Array(1,1, 2, 3, 4,4,5), 2)
    val mapRDD=rdd.map(num=>{
      println("--------map----------")
      (num,1)})
    mapRDD.persist()
   // mapRDD.checkpoint()

    val shuffledRDD = mapRDD.reduceByKey(_   _)
    log.info("---1---" mapRDD.toDebugString)
    log.info("---2---" shuffledRDD.toDebugString)
    shuffledRDD.collect().map(x=> log.info(x._1 ":" x._2))
    log.info("   3   " mapRDD.toDebugString)
    log.info("   4   " shuffledRDD.toDebugString)
    println("---5---")
    printRDD(mapRDD)
    println("---6---")
    printRDD(shuffledRDD)
    val groupRDD = mapRDD.groupByKey(2)
    groupRDD.collect().map(println(_))
    sparkContext.stop()
  }

 def printRDD(rdd:RDD[_]): Unit ={
    println(rdd.getClass.getName)
    if(!rdd.dependencies.isEmpty){
      rdd.dependencies.map(dependcy=>printRDD(dependcy.rdd))
    }
  }
学新通

1.2,操作之间的依赖关系

学新通

1.3,生成的RDD之间的依赖关系

学新通

二,Spark cache,persist源码跟踪。

2.1,缓存运行关键日志


21/12/19 10:02:25 INFO WordCount: ---log1---(2) MapPartitionsRDD[1] at map at WordCount.scala:17 [Memory Deserialized 1x Replicated]
 |  ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 [Memory Deserialized 1x Replicated]
21/12/19 10:02:25 INFO WordCount: ---log2---(2) ShuffledRDD[2] at reduceByKey at WordCount.scala:23 []
  -(2) MapPartitionsRDD[1] at map at WordCount.scala:17 []
    |  ParallelCollectionRDD[0] at parallelize at 
--------map----------
--------map----------
--------map----------
--------map----------
--------map----------
--------map----------
--------map----------

21/12/19 10:02:27 INFO WordCount: 4:2
21/12/19 10:02:27 INFO WordCount: 2:1
21/12/19 10:02:27 INFO WordCount: 1:2
21/12/19 10:02:27 INFO WordCount: 3:1
21/12/19 10:02:27 INFO WordCount: 5:1
21/12/19 10:02:27 INFO WordCount:    log3   (2) MapPartitionsRDD[1] at map at WordCount.scala:17 [Memory Deserialized 1x Replicated]
 |       CachedPartitions: 2; MemorySize: 288.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
 |  ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 [Memory Deserialized 1x Replicated]
21/12/19 10:02:27 INFO WordCount:    log4   (2) ShuffledRDD[2] at reduceByKey at WordCount.scala:23 []
  -(2) MapPartitionsRDD[1] at map at WordCount.scala:17 []
    |      CachedPartitions: 2; MemorySize: 288.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
    |  ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 []
---log5---
org.apache.spark.rdd.MapPartitionsRDD
org.apache.spark.rdd.ParallelCollectionRDD
---log6---
org.apache.spark.rdd.ShuffledRDD
org.apache.spark.rdd.MapPartitionsRDD
org.apache.spark.rdd.ParallelCollectionRDD
学新通

2.2,注册需要缓存的RDD

/**
 * Persist this RDD with the default storage level (`MEMORY_ONLY`).
 */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

/**
 * Persist this RDD with the default storage level (`MEMORY_ONLY`).
 */
def cache(): this.type = persist()

cache其实就是存储级别为MEMORY_ONLY的persist。

/**
 * Set this RDD's storage level to persist its values across operations after the first time
 * it is computed. This can only be used to assign a new storage level if the RDD does not
 * have a storage level set yet. Local checkpointing is an exception.
 */
def persist(newLevel: StorageLevel): this.type = {
  if (isLocallyCheckpointed) {
    // This means the user previously called localCheckpoint(), which should have already
    // marked this RDD for persisting. Here we should override the old storage level with
    // one that is explicitly requested by the user (after adapting it to use disk).
    persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
  } else {
    persist(newLevel, allowOverride = false)
  }
}

先不关注Checkpoint的情况,查看persist(newLevel: StorageLevel, allowOverride: Boolean): this.type 方法
RDD.scala

private var storageLevel: StorageLevel = StorageLevel.NONE


/**
 * Mark this RDD for persisting using the specified level.
 *
 * @param newLevel the target storage level
 * @param allowOverride whether to override any existing level with the new one
 */
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
  // TODO: Handle changes of StorageLevel
  if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
    throw new UnsupportedOperationException(
      "Cannot change storage level of an RDD after it was already assigned a level")
  }
  // If this is the first time this RDD is marked for persisting, register it
  // with the SparkContext for cleanups and accounting. Do this only once.
  if (storageLevel == StorageLevel.NONE) {
    sc.cleaner.foreach(_.registerRDDForCleanup(this))
    sc.persistRDD(this)
  }
  storageLevel = newLevel
  this
}
学新通

SparkContext.scala

/**
 * Register an RDD to be persisted in memory and/or disk storage
 */
private[spark] def persistRDD(rdd: RDD[_]): Unit = {
  persistentRdds(rdd.id) = rdd
}

// Keeps track of all persisted RDDs
private[spark] val persistentRdds = {
  val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
  map.asScala
}

这里只是将需要缓存的RDD注册到SparkContext的对象中并改变了类成员storageLevel的值,并没有干实际的工作,也没法干因为这时候还没有数据自然没法缓存。RDD本身只封装了运算逻辑,其中并没有保存数据,只有行动RDD提交job触发计算的时候,rdd才会关联到数据,所以继续关注rdd计算相关的iteratorcompute方法。
参考: spark源码跟踪(六)RDD逻辑代码执行.

2.3,缓存的实现

RDD.scala

/**
 * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
 * This should ''not'' be called by users directly, but is available for implementors of custom
 * subclasses of RDD.
 */
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

/**
 * Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached.
 */
private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
  val blockId = RDDBlockId(id, partition.index)
  var readCachedBlock = true
  // This method is called on executors, so we need call SparkEnv.get instead of sc.env.
  SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
    readCachedBlock = false
    computeOrReadCheckpoint(partition, context)
  }) match {
    case Left(blockResult) =>
      if (readCachedBlock) {
        val existingMetrics = context.taskMetrics().inputMetrics
        existingMetrics.incBytesRead(blockResult.bytes)
        new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
          override def next(): T = {
            existingMetrics.incRecordsRead(1)
            delegate.next()
          }
        }
      } else {
        new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
      }
    case Right(iter) =>
      new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
  }
}

/**
 * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
 */
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
  if (isCheckpointedAndMaterialized) {
    firstParent[T].iterator(split, context)
  } else {
    compute(split, context)
  }
}
学新通

storageLevel的值在上文中已经改变为用户指定的参数(不指定则为StorageLevel.MEMORY_ONLY),不再是StorageLevel.NONE。所以这里会执行 getOrCompute(split, context)
学新通

SparkEnv.get.blockManager.getOrElseUpdate的第四个参数是一个参数为空返回值为数据迭代器的匿名函数,这里传入的函数其函数体中通过computeOrReadCheckpoint调用当前RDD中的具体compute方法计算数据并将数据封装在iterator中返回,也就是说该匿名函数的作用是计算返回当前RDD的数据。


/**
 * Retrieve the given block if it exists, otherwise call the provided `makeIterator` method
 * to compute the block, persist it, and return its values.
 *
 * @return either a BlockResult if the block was successfully cached, or an iterator if the block
 *         could not be cached.
 */
def getOrElseUpdate[T](
    blockId: BlockId,
    level: StorageLevel,
    classTag: ClassTag[T],
    makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
  // Attempt to read the block from local or remote storage. If it's present, then we don't need
  // to go through the local-get-or-put path.
  get[T](blockId)(classTag) match {
    case Some(block) =>
      return Left(block)
    case _ =>
      // Need to compute the block.
  }
  // Initially we hold no locks on this block.
  doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
    case None =>
      // doPut() didn't hand work back to us, so the block already existed or was successfully
      // stored. Therefore, we now hold a read lock on the block.
      val blockResult = getLocalValues(blockId).getOrElse {
        // Since we held a read lock between the doPut() and get() calls, the block should not
        // have been evicted, so get() not returning the block indicates some internal error.
        releaseLock(blockId)
        throw new SparkException(s"get() failed for block $blockId even though we held a lock")
      }
      // We already hold a read lock on the block from the doPut() call and getLocalValues()
      // acquires the lock again, so we need to call releaseLock() here so that the net number
      // of lock acquisitions is 1 (since the caller will only call release() once).
      releaseLock(blockId)
      Left(blockResult)
    case Some(iter) =>
      // The put failed, likely because the data was too large to fit in memory and could not be
      // dropped to disk. Therefore, we need to pass the input iterator back to the caller so
      // that they can decide what to do with the values (e.g. process them without caching).
     Right(iter)
  }
}

/**
 * Get a block from the block manager (either local or remote).
 *
 * This acquires a read lock on the block if the block was stored locally and does not acquire
 * any locks if the block was fetched from a remote block manager. The read lock will
 * automatically be freed once the result's `data` iterator is fully consumed.
 */
def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
  val local = getLocalValues(blockId)
  if (local.isDefined) {
    logInfo(s"Found block $blockId locally")
    return local
  }
  val remote = getRemoteValues[T](blockId)
  if (remote.isDefined) {
    logInfo(s"Found block $blockId remotely")
    return remote
  }
  None
}
学新通

继续跟踪SparkEnv.get.blockManager.getOrElseUpdate,其第四个参数makeIterator的实际值就是返回RDD的数据的函数(此时没有被调用)。
进入该方法体中,首先从缓存中获取block,获取成功则直接返回结果(类参数重的makeIterator函数都没有被调用,所以也就不触发rdd compute);否则,执行缓存过程。该过程的调用链条过于复杂,涉及到内存管理,磁盘IO等,不深入跟踪。
只需要知道该函数是用来缓存并读取缓存数据,如已经有缓存了直接返回缓存结果,否则会先计算数据然后根据存储级别缓存数据。已有缓存或者新建成功了则返回Left(Option[BlockResult])对象;缓存失败返回Right(Iterator[T]) 对象将RDD数据源路返回。

2.4 缓存会重构RDD血缘关系?

网上说缓存成功后会重构RDD血缘关系?依据是toDebugString显示的依赖关系中会添加 CachedPartitions依赖。日志中log1,log2与log3,log4的对比确实添加了 CachedPartitions依赖 ,但是这并不意味着RDD的依赖关系发生了变化

21/12/19 09:12:49 INFO WordCount: ---log1---(2) MapPartitionsRDD[1] at map at WordCount.scala:17 [Memory Deserialized 1x Replicated]
 |  ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 [Memory Deserialized 1x Replicated]
21/12/19 09:12:50 INFO WordCount: ---log2---(2) ShuffledRDD[2] at reduceByKey at WordCount.scala:23 []
  -(2) MapPartitionsRDD[1] at map at WordCount.scala:17 []
    |  ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 []
21/12/19 09:12:51 INFO WordCount:    log3   (2) MapPartitionsRDD[1] at map at WordCount.scala:17 [Memory Deserialized 1x Replicated]
 |       CachedPartitions: 2; MemorySize: 288.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
 |  ReliableCheckpointRDD[3] at collect at WordCount.scala:26 [Memory Deserialized 1x Replicated]
21/12/19 09:12:51 INFO WordCount:    log4   (2) ShuffledRDD[2] at reduceByKey at WordCount.scala:23 []
  -(2) MapPartitionsRDD[1] at map at WordCount.scala:17 []
    |      CachedPartitions: 2; MemorySize: 288.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
    |  ReliableCheckpointRDD[3] at collect at WordCount.scala:26 []
---log5---
org.apache.spark.rdd.MapPartitionsRDD
org.apache.spark.rdd.ParallelCollectionRDD
---log6---
org.apache.spark.rdd.ShuffledRDD
org.apache.spark.rdd.MapPartitionsRDD
org.apache.spark.rdd.ParallelCollectionRDD
学新通

我认为这种说话是错误的,toDebugString代码中只是输出了CachedPartitions信息并不是在dependcy依赖链中真实添加了节点。
学新通
log5,log6表明rdd血缘关系没有改变,源码中也没有看到任何地方会影响dependency的值。这也是缓存与checkpoint的一大区别,checkpoint确实会改变rdd血缘关系。

2.5 缓存是application级别的

2.4中说到缓存不会切断rdd血缘关系,但是缓存确实会切断RDD依赖链逻辑计算(compute方法)的执行。一个RDD的数据依赖于上一级RDD的计算结果,但是如果当前RDD的数据以及被缓存过就可以直接读缓存,不需要调用上一级RDD的计算。所以说RDD缓存不会切断RDD血缘关系,但是会切断RDD计算
测试代码中有两个行动操作:groupByKey,reducyByKey后分别调用collect()。生成的两个job源头都依赖于MapPartitionsRDD,但是传入其中的逻辑计算代码只执行了一次(如下日志只出现了一次)。
学新通
说明第二次行动操作计算是直接读取了mapRDD:MapPartitionsRDD的缓存,并没有触发mapRDD及其父RDD的计算。
还有一个问题:
同一个application的不同job之间的执行时串行切独立的,yarn-cluster模式下,俩job可能都不在同一台机器下执行,后面的job怎么读取前一个job中的rdd缓存?
看下获取RDD缓存的源码:
学新通
学新通
获取缓存方式分本地和远程,所以俩job不在一台机器上也没关系,可以远程获取。缓存在使用时不需要指定存储路径,spark使用application的临时目录存储缓存,所以直到application结束删除缓存,application中的job都可以访问。即便缓存的存储级别是StorageLevel.MEMORY_ONLY,读取缓存数据的时候也可能有IO,网络IO。

三。检查点checkpoint

检查点使用时是需要设置存储位置的没有设置的话会报错。
学新通

sparkContext.setCheckpointDir(directory: String)

3.1 运行关键日志

21/12/19 10:32:05 INFO WordCount: ---log1---(2) MapPartitionsRDD[1] at map at WordCount.scala:17 [Memory Deserialized 1x Replicated]
 |  ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 [Memory Deserialized 1x Replicated]
21/12/19 10:32:05 INFO WordCount: ---log2---(2) ShuffledRDD[2] at reduceByKey at WordCount.scala:23 []
  -(2) MapPartitionsRDD[1] at map at WordCount.scala:17 []
    |  ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 []
--------map----------
--------map----------
--------map----------
--------map----------
--------map----------
--------map----------
--------map----------
21/12/19 10:33:58 INFO WordCount:    log3   (2) MapPartitionsRDD[1] at map at WordCount.scala:17 [Memory Deserialized 1x Replicated]
 |       CachedPartitions: 2; MemorySize: 288.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
 |  ReliableCheckpointRDD[3] at collect at WordCount.scala:26 [Memory Deserialized 1x Replicated]
21/12/19 10:33:58 INFO WordCount:    log4   (2) ShuffledRDD[2] at reduceByKey at WordCount.scala:23 []
  -(2) MapPartitionsRDD[1] at map at WordCount.scala:17 []
    |      CachedPartitions: 2; MemorySize: 288.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
    |  ReliableCheckpointRDD[3] at collect at WordCount.scala:26 []
---log5---
org.apache.spark.rdd.MapPartitionsRDD
org.apache.spark.rdd.ReliableCheckpointRDD
---log6---
org.apache.spark.rdd.ShuffledRDD
org.apache.spark.rdd.MapPartitionsRDD
org.apache.spark.rdd.ReliableCheckpointRDD
学新通

3.2 checkpoint源码跟踪

跟踪源码def checkpoint(): Unit = RDDCheckpointData.synchronized

/**
 * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
 * directory set with `SparkContext#setCheckpointDir` and all references to its parent
 * RDDs will be removed. This function must be called before any job has been
 * executed on this RDD. It is strongly recommended that this RDD is persisted in
 * memory, otherwise saving it on a file will require recomputation.
 */
def checkpoint(): Unit = RDDCheckpointData.synchronized {
  // NOTE: we use a global lock here due to complexities downstream with ensuring
  // children RDD partitions point to the correct parent partitions. In the future
  // we should revisit this consideration.
  if (context.checkpointDir.isEmpty) {
    throw new SparkException("Checkpoint directory has not been set in the SparkContext")
  } else if (checkpointData.isEmpty) {
    checkpointData = Some(new ReliableRDDCheckpointData(this))
  }
}
学新通

这里只检查了检查点路径并初始化了checkpointData对象(将rdd对象与ReliableRDDCheckpointData对象关联起来,后面的缓存结果如path会封装到ReliableRDDCheckpointData对象中),不涉及到数据的缓存。真正有数据是在运行job的时候,跟踪job的提交运算过程。


/**
 * Run a function on a given set of partitions in an RDD and pass the results to the given
 * handler function. This is the main entry point for all actions in Spark.
 *
 * @param rdd target RDD to run tasks on
 * @param func a function to run on each partition of the RDD
 * @param partitions set of partitions to run on; some jobs may not want to compute on all
 * partitions of the target RDD, e.g. for operations like `first()`
 * @param resultHandler callback to pass each result to
 */
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {
  if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
  }
  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: "   callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD's recursive dependencies:\n"   rdd.toDebugString)
  }
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
  progressBar.foreach(_.finishAll())
  rdd.doCheckpoint()
}
学新通

查看rdd.doCheckpoint()

/**
 * Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD
 * has completed (therefore the RDD has been materialized and potentially stored in memory).
 * doCheckpoint() is called recursively on the parent RDDs.
 */
private[spark] def doCheckpoint(): Unit = {
  RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
    if (!doCheckpointCalled) {
      doCheckpointCalled = true
      if (checkpointData.isDefined) {
        if (checkpointAllMarkedAncestors) {
          // TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint
          // them in parallel.
          // Checkpoint parents first because our lineage will be truncated after we
          // checkpoint ourselves
          dependencies.foreach(_.rdd.doCheckpoint())
        }
        checkpointData.get.checkpoint()
      } else {
        dependencies.foreach(_.rdd.doCheckpoint())
      }
    }
  }
}
学新通

这里涉及到一个参数:

spark.checkpoint.checkpointAllMarkedAncestors 

其默认值是false,标识是否需要迭代执行父类的checkpoint。
如果当前rdd没有定义checkpoint操作,则迭代执行父RDD 的doCheckpoint();
当前RDD的定义了checkpoint的情况,查看spark.checkpoint.checkpointAllMarkedAncestors 的配置,判断是否需要迭代执行父类的checkpoint。如果需要先做父RDD的checkpoint(迭代,保证父RDD在子RDD之前)。
checkpointData.get 返回的是new ReliableRDDCheckpointData(this)对象,this指代的是需要checkpoint的rdd。

RDDCheckpointData.scala

/**
 * Materialize this RDD and persist its content.
 * This is called immediately after the first action invoked on this RDD has completed.
 */
final def checkpoint(): Unit = {
  // Guard against multiple threads checkpointing the same RDD by
  // atomically flipping the state of this RDDCheckpointData
  RDDCheckpointData.synchronized {
    if (cpState == Initialized) {
      cpState = CheckpointingInProgress
    } else {
      return
    }
  }

  val newRDD = doCheckpoint()

  // Update our state and truncate the RDD lineage
  RDDCheckpointData.synchronized {
    cpRDD = Some(newRDD)
    cpState = Checkpointed
    rdd.markCheckpointed()
  }
}

protected def doCheckpoint(): CheckpointRDD[T]
学新通

doCheckpoint()是个抽象方法,看字类的实现。
只有一个子类ReliableRDDCheckpointData.scala


/**
 * Materialize this RDD and write its content to a reliable DFS.
 * This is called immediately after the first action invoked on this RDD has completed.
 */
protected override def doCheckpoint(): CheckpointRDD[T] = {
  val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)


  // Optionally clean our checkpoint files if the reference is out of scope
  if (rdd.conf.get(CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS)) {
    rdd.context.cleaner.foreach { cleaner =>
      cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
    }
  }

  logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
  newRDD
}

学新通

调用其伴生对象中的writeRDDToCheckpointDirectory函数


/**
 * Write RDD to checkpoint files and return a ReliableCheckpointRDD representing the RDD.
 */
def writeRDDToCheckpointDirectory[T: ClassTag](
    originalRDD: RDD[T],
    checkpointDir: String,
    blockSize: Int = -1): ReliableCheckpointRDD[T] = {
  val checkpointStartTimeNs = System.nanoTime()

  val sc = originalRDD.sparkContext

  // Create the output path for the checkpoint
  val checkpointDirPath = new Path(checkpointDir)
  val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
  if (!fs.mkdirs(checkpointDirPath)) {
    throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
  }

  // Save to file, and reload it as an RDD
  val broadcastedConf = sc.broadcast(
    new SerializableConfiguration(sc.hadoopConfiguration))
  // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
  sc.runJob(originalRDD,
    writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)

  if (originalRDD.partitioner.nonEmpty) {
    writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
  }




  val checkpointDurationMs =
    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
  logInfo(s"Checkpointing took $checkpointDurationMs ms.")




  val newRDD = new ReliableCheckpointRDD[T](
    sc, checkpointDirPath.toString, originalRDD.partitioner)
  if (newRDD.partitions.length != originalRDD.partitions.length) {
    throw new SparkException(
      "Checkpoint RDD has a different number of partitions from original RDD. Original "  
        s"RDD [ID: ${originalRDD.id}, num of partitions: ${originalRDD.partitions.length}]; "  
        s"Checkpoint RDD [ID: ${newRDD.id}, num of partitions: "  
        s"${newRDD.partitions.length}].")
  }
  newRDD
}
学新通

学新通
这里提交了一个job(如果上文中提到”spark.checkpoint.checkpointAllMarkedAncestors"有相关配置,挨个对父RDD做检查点,那代价可就大了),相当于需要checkpoint的rdd调用了一个行动操作,行动操作的内容是 writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)

mapRDD.checkpoint()
mapRDD.collect(ReliableCheckpointRDD.writeRDDToCheckpointDirectory())

上面两种写法逻辑上大概差不多,语法是肯定是不行的。所以checkpoint是行动操作还是转换操作?

计算结束后writePartitionerToCheckpointDir 将rdd中的数据输出到指定的路径(涉及到IO)。然后生成一个新的newRDD:ReliableCheckpointRDD,newRDD的SparkContext,分区器都和原来一样,并包含了数据存储的地址,返回该ReliableCheckpointRDD对象。
返回到上文调用计算的地方
RDDCheckpointData.scala


/**
 * Materialize this RDD and persist its content.
 * This is called immediately after the first action invoked on this RDD has completed.
 */
final def checkpoint(): Unit = {
  // Guard against multiple threads checkpointing the same RDD by
  // atomically flipping the state of this RDDCheckpointData
  RDDCheckpointData.synchronized {
    if (cpState == Initialized) {
      cpState = CheckpointingInProgress
    } else {
      return
    }
  }


  val newRDD = doCheckpoint()


  // Update our state and truncate the RDD lineage
  RDDCheckpointData.synchronized {
    cpRDD = Some(newRDD)
    cpState = Checkpointed
    rdd.markCheckpointed()
  }
}

学新通

更新状态,然后 执行rdd.markCheckpointed()


/**
 * Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`)
 * created from the checkpoint file, and forget its old dependencies and partitions.
 */
private[spark] def markCheckpointed(): Unit = stateLock.synchronized {
  clearDependencies()
  partitions_ = null
  deps = null    // Forget the constructor argument for dependencies too
}


/**
 * Clears the dependencies of this RDD. This method must ensure that all references
 * to the original parent RDDs are removed to enable the parent RDDs to be garbage
 * collected. Subclasses of RDD may override this method for implementing their own cleaning
 * logic. See [[org.apache.spark.rdd.UnionRDD]] for an example.
 */
protected def clearDependencies(): Unit = stateLock.synchronized {
  dependencies_ = null
}
学新通

一个RDD对象mapRDD创建检查点后,父RDD的依赖关系会被切断(dependencies_ = null
),切断后所在DAG的源头RDD是谁?
查看RDD.scala

final def dependencies: Seq[Dependency[_]] = {
  checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
    if (dependencies_ == null) {
      stateLock.synchronized {
        if (dependencies_ == null) {
          dependencies_ = getDependencies
        }
      }
    }
    dependencies_
  }
}

一个rdd执行完毕checkpoint后,其父依赖变为了检查点RDD对象ReliableCheckpointRDD,依赖关系为OneToOneDependency,自身成为DAG中的第二个RDD。
checkpoint执行前后RDD血缘关系变化图:
学新通
日志中也能看出来:
学新通

3.3 checkpoint独立于application

Spark checkpoint在执行时需要明确指定存储位置,application执行结束后也不会删除checkpoint文件。
学新通
SparkContext中其实也有从checkpoint创建源头RDD的函数,只是包私有的。
学新通

四,检查点与缓存执行顺序。

再回到获取一个RDD数据的地方

/**
 * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
 * This should ''not'' be called by users directly, but is available for implementors of custom
 * subclasses of RDD.
 */
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context)
  } else {
    computeOrReadCheckpoint(split, context)
  }
}

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
  if (isCheckpointedAndMaterialized) {
    firstParent[T].iterator(split, context)
  } else {
    compute(split, context)
  }
}
学新通

如果RDD有缓存,则从缓存中读取数据,缓存失败的情况下则触发重新计算并再次尝试缓存;
没有缓存有检查点的情况下,则从检查点中读取数据,firstParent[T].iterator(split, context),firstParent即为ReliableCheckpointRDD,当前rdd的compute方法不会执行。
如果有检查点又有缓存的情况下呢?是否与两者的调用顺序有关系?
查看DAGScheduler.scala类job提交方法:
学新通
提交job后会返回一个JobWaiter对象用来阻塞当前线程(driver线程)直到job执行结束或者用来取消作业,所以job的提交是串行的。(driver线程提交job后,job的后续工作由一个名为“dag-scheduler-event-loop”的守护线程负责)
再看SparkContext.scala
学新通
学新通
检查点生成的job在行动算子生成的job之后执行,所以缓存肯定先于检查点执行,与两者的调用顺序无关。检查点job执行时,获取mapRDD数据,这是缓存中已经缓存了,所以不会触发其compute方法的执行,直接从缓存中读取数据。

五,总结

1,缓存(cache,persist)不会改变RDD的血缘关系:但是会切断RDD依赖链的计算依赖,如果RDD被缓存,那么该RDD上游的所有RDD封装的运算逻辑不会被执行。
2,spark 缓存是application级别:存储在application的临时目录存储(如果需要存硬盘),所以直到application结束删除缓存,application中的job都可以访问。
3,即便缓存的存储级别是StorageLevel.MEMORY_ONLY,读取缓存数据的时候也可能有IO,网络IO。
4,checkpoint会改变RDD血缘关系:一个rdd执行完毕checkpoint后,其父依赖变为了检查点RDD对象ReliableCheckpointRDD,依赖关系为OneToOneDependency,自身成为DAG中的第二个RDD。
5,checkpoint独立于application:application运行结束后不会删除checkpoint目录,SparkContext可以从checkpoint目录创建ReliableCheckpointRDD作为DAG的源头(protected[spark])。
6,检查点checkpoint会提交Job
checkpoint是个怪胎,如果一个application中没有其它行动操作,那么checkpoint也不会提交job,可以说是转换操作;如果有其它行动操作那么它也会提交job,也可以说是行动操作。
7,缓存先于检查点执行:检查点生成的job在行动算子生成的job之后执行,所以缓存肯定先于检查点执行,与两者的调用顺序无关。
8,检查点应该配合缓存使用:检查点会重新提交Job重复执行RDD的逻辑计算,但是如果RDD缓存数据了,则直接读缓存获取行动操作计算的结果,不用重复执行计算,直接进行IO输出缓存结果。
9,多job情况下,缓存公用的RDD,可避免重复计算。
10,单job,但是DAG过长的情况下,缓存计算量大的RDD,有利于任务失败的情况下恢复数据。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgcibai
系列文章
更多 icon
同类精品
更多 icon
继续加载