spark源码跟踪(七)缓存以和检查点机制源码跟踪
一,程序入口
1.1,测试代码
def main(args: Array[String]): Unit = {
log.info("-------begin---------")
val sparkConnf=new SparkConf().setAppName("persistTest").setMaster("local[3]")
val sparkContext=new SparkContext(sparkConnf)
sparkContext.setCheckpointDir("./")
val rdd = sparkContext.parallelize(Array(1,1, 2, 3, 4,4,5), 2)
val mapRDD=rdd.map(num=>{
println("--------map----------")
(num,1)})
mapRDD.persist()
// mapRDD.checkpoint()
val shuffledRDD = mapRDD.reduceByKey(_ _)
log.info("---1---" mapRDD.toDebugString)
log.info("---2---" shuffledRDD.toDebugString)
shuffledRDD.collect().map(x=> log.info(x._1 ":" x._2))
log.info(" 3 " mapRDD.toDebugString)
log.info(" 4 " shuffledRDD.toDebugString)
println("---5---")
printRDD(mapRDD)
println("---6---")
printRDD(shuffledRDD)
val groupRDD = mapRDD.groupByKey(2)
groupRDD.collect().map(println(_))
sparkContext.stop()
}
def printRDD(rdd:RDD[_]): Unit ={
println(rdd.getClass.getName)
if(!rdd.dependencies.isEmpty){
rdd.dependencies.map(dependcy=>printRDD(dependcy.rdd))
}
}
1.2,操作之间的依赖关系
1.3,生成的RDD之间的依赖关系
二,Spark cache,persist源码跟踪。
2.1,缓存运行关键日志
21/12/19 10:02:25 INFO WordCount: ---log1---(2) MapPartitionsRDD[1] at map at WordCount.scala:17 [Memory Deserialized 1x Replicated]
| ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 [Memory Deserialized 1x Replicated]
21/12/19 10:02:25 INFO WordCount: ---log2---(2) ShuffledRDD[2] at reduceByKey at WordCount.scala:23 []
-(2) MapPartitionsRDD[1] at map at WordCount.scala:17 []
| ParallelCollectionRDD[0] at parallelize at
--------map----------
--------map----------
--------map----------
--------map----------
--------map----------
--------map----------
--------map----------
21/12/19 10:02:27 INFO WordCount: 4:2
21/12/19 10:02:27 INFO WordCount: 2:1
21/12/19 10:02:27 INFO WordCount: 1:2
21/12/19 10:02:27 INFO WordCount: 3:1
21/12/19 10:02:27 INFO WordCount: 5:1
21/12/19 10:02:27 INFO WordCount: log3 (2) MapPartitionsRDD[1] at map at WordCount.scala:17 [Memory Deserialized 1x Replicated]
| CachedPartitions: 2; MemorySize: 288.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 [Memory Deserialized 1x Replicated]
21/12/19 10:02:27 INFO WordCount: log4 (2) ShuffledRDD[2] at reduceByKey at WordCount.scala:23 []
-(2) MapPartitionsRDD[1] at map at WordCount.scala:17 []
| CachedPartitions: 2; MemorySize: 288.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 []
---log5---
org.apache.spark.rdd.MapPartitionsRDD
org.apache.spark.rdd.ParallelCollectionRDD
---log6---
org.apache.spark.rdd.ShuffledRDD
org.apache.spark.rdd.MapPartitionsRDD
org.apache.spark.rdd.ParallelCollectionRDD
2.2,注册需要缓存的RDD
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()
cache其实就是存储级别为MEMORY_ONLY的persist。
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet. Local checkpointing is an exception.
*/
def persist(newLevel: StorageLevel): this.type = {
if (isLocallyCheckpointed) {
// This means the user previously called localCheckpoint(), which should have already
// marked this RDD for persisting. Here we should override the old storage level with
// one that is explicitly requested by the user (after adapting it to use disk).
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
} else {
persist(newLevel, allowOverride = false)
}
}
先不关注Checkpoint的情况,查看persist(newLevel: StorageLevel, allowOverride: Boolean): this.type 方法
RDD.scala
private var storageLevel: StorageLevel = StorageLevel.NONE
/**
* Mark this RDD for persisting using the specified level.
*
* @param newLevel the target storage level
* @param allowOverride whether to override any existing level with the new one
*/
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
// If this is the first time this RDD is marked for persisting, register it
// with the SparkContext for cleanups and accounting. Do this only once.
if (storageLevel == StorageLevel.NONE) {
sc.cleaner.foreach(_.registerRDDForCleanup(this))
sc.persistRDD(this)
}
storageLevel = newLevel
this
}
SparkContext.scala
/**
* Register an RDD to be persisted in memory and/or disk storage
*/
private[spark] def persistRDD(rdd: RDD[_]): Unit = {
persistentRdds(rdd.id) = rdd
}
// Keeps track of all persisted RDDs
private[spark] val persistentRdds = {
val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
map.asScala
}
这里只是将需要缓存的RDD注册到SparkContext的对象中并改变了类成员storageLevel的值,并没有干实际的工作,也没法干因为这时候还没有数据自然没法缓存。RDD本身只封装了运算逻辑,其中并没有保存数据,只有行动RDD提交job触发计算的时候,rdd才会关联到数据,所以继续关注rdd计算相关的iterator,compute方法。
参考: spark源码跟踪(六)RDD逻辑代码执行.
2.3,缓存的实现
RDD.scala
/**
* Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
*/
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
/**
* Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached.
*/
private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
val blockId = RDDBlockId(id, partition.index)
var readCachedBlock = true
// This method is called on executors, so we need call SparkEnv.get instead of sc.env.
SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
readCachedBlock = false
computeOrReadCheckpoint(partition, context)
}) match {
case Left(blockResult) =>
if (readCachedBlock) {
val existingMetrics = context.taskMetrics().inputMetrics
existingMetrics.incBytesRead(blockResult.bytes)
new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
} else {
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
}
case Right(iter) =>
new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
}
}
/**
* Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
*/
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}
storageLevel的值在上文中已经改变为用户指定的参数(不指定则为StorageLevel.MEMORY_ONLY),不再是StorageLevel.NONE。所以这里会执行 getOrCompute(split, context)
SparkEnv.get.blockManager.getOrElseUpdate的第四个参数是一个参数为空返回值为数据迭代器的匿名函数,这里传入的函数其函数体中通过computeOrReadCheckpoint调用当前RDD中的具体compute方法计算数据并将数据封装在iterator中返回,也就是说该匿名函数的作用是计算返回当前RDD的数据。
/**
* Retrieve the given block if it exists, otherwise call the provided `makeIterator` method
* to compute the block, persist it, and return its values.
*
* @return either a BlockResult if the block was successfully cached, or an iterator if the block
* could not be cached.
*/
def getOrElseUpdate[T](
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[T],
makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
// Attempt to read the block from local or remote storage. If it's present, then we don't need
// to go through the local-get-or-put path.
get[T](blockId)(classTag) match {
case Some(block) =>
return Left(block)
case _ =>
// Need to compute the block.
}
// Initially we hold no locks on this block.
doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
case None =>
// doPut() didn't hand work back to us, so the block already existed or was successfully
// stored. Therefore, we now hold a read lock on the block.
val blockResult = getLocalValues(blockId).getOrElse {
// Since we held a read lock between the doPut() and get() calls, the block should not
// have been evicted, so get() not returning the block indicates some internal error.
releaseLock(blockId)
throw new SparkException(s"get() failed for block $blockId even though we held a lock")
}
// We already hold a read lock on the block from the doPut() call and getLocalValues()
// acquires the lock again, so we need to call releaseLock() here so that the net number
// of lock acquisitions is 1 (since the caller will only call release() once).
releaseLock(blockId)
Left(blockResult)
case Some(iter) =>
// The put failed, likely because the data was too large to fit in memory and could not be
// dropped to disk. Therefore, we need to pass the input iterator back to the caller so
// that they can decide what to do with the values (e.g. process them without caching).
Right(iter)
}
}
/**
* Get a block from the block manager (either local or remote).
*
* This acquires a read lock on the block if the block was stored locally and does not acquire
* any locks if the block was fetched from a remote block manager. The read lock will
* automatically be freed once the result's `data` iterator is fully consumed.
*/
def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
val local = getLocalValues(blockId)
if (local.isDefined) {
logInfo(s"Found block $blockId locally")
return local
}
val remote = getRemoteValues[T](blockId)
if (remote.isDefined) {
logInfo(s"Found block $blockId remotely")
return remote
}
None
}
继续跟踪SparkEnv.get.blockManager.getOrElseUpdate,其第四个参数makeIterator的实际值就是返回RDD的数据的函数(此时没有被调用)。
进入该方法体中,首先从缓存中获取block,获取成功则直接返回结果(类参数重的makeIterator函数都没有被调用,所以也就不触发rdd compute);否则,执行缓存过程。该过程的调用链条过于复杂,涉及到内存管理,磁盘IO等,不深入跟踪。
只需要知道该函数是用来缓存并读取缓存数据,如已经有缓存了直接返回缓存结果,否则会先计算数据然后根据存储级别缓存数据。已有缓存或者新建成功了则返回Left(Option[BlockResult])对象;缓存失败返回Right(Iterator[T]) 对象将RDD数据源路返回。
2.4 缓存会重构RDD血缘关系?
网上说缓存成功后会重构RDD血缘关系?依据是toDebugString显示的依赖关系中会添加 CachedPartitions依赖。日志中log1,log2与log3,log4的对比确实添加了 CachedPartitions依赖 ,但是这并不意味着RDD的依赖关系发生了变化
21/12/19 09:12:49 INFO WordCount: ---log1---(2) MapPartitionsRDD[1] at map at WordCount.scala:17 [Memory Deserialized 1x Replicated]
| ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 [Memory Deserialized 1x Replicated]
21/12/19 09:12:50 INFO WordCount: ---log2---(2) ShuffledRDD[2] at reduceByKey at WordCount.scala:23 []
-(2) MapPartitionsRDD[1] at map at WordCount.scala:17 []
| ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 []
21/12/19 09:12:51 INFO WordCount: log3 (2) MapPartitionsRDD[1] at map at WordCount.scala:17 [Memory Deserialized 1x Replicated]
| CachedPartitions: 2; MemorySize: 288.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ReliableCheckpointRDD[3] at collect at WordCount.scala:26 [Memory Deserialized 1x Replicated]
21/12/19 09:12:51 INFO WordCount: log4 (2) ShuffledRDD[2] at reduceByKey at WordCount.scala:23 []
-(2) MapPartitionsRDD[1] at map at WordCount.scala:17 []
| CachedPartitions: 2; MemorySize: 288.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ReliableCheckpointRDD[3] at collect at WordCount.scala:26 []
---log5---
org.apache.spark.rdd.MapPartitionsRDD
org.apache.spark.rdd.ParallelCollectionRDD
---log6---
org.apache.spark.rdd.ShuffledRDD
org.apache.spark.rdd.MapPartitionsRDD
org.apache.spark.rdd.ParallelCollectionRDD
我认为这种说话是错误的,toDebugString代码中只是输出了CachedPartitions信息并不是在dependcy依赖链中真实添加了节点。
log5,log6表明rdd血缘关系没有改变,源码中也没有看到任何地方会影响dependency的值。这也是缓存与checkpoint的一大区别,checkpoint确实会改变rdd血缘关系。
2.5 缓存是application级别的
2.4中说到缓存不会切断rdd血缘关系,但是缓存确实会切断RDD依赖链逻辑计算(compute方法)的执行。一个RDD的数据依赖于上一级RDD的计算结果,但是如果当前RDD的数据以及被缓存过就可以直接读缓存,不需要调用上一级RDD的计算。所以说RDD缓存不会切断RDD血缘关系,但是会切断RDD计算。
测试代码中有两个行动操作:groupByKey,reducyByKey后分别调用collect()。生成的两个job源头都依赖于MapPartitionsRDD,但是传入其中的逻辑计算代码只执行了一次(如下日志只出现了一次)。
说明第二次行动操作计算是直接读取了mapRDD:MapPartitionsRDD的缓存,并没有触发mapRDD及其父RDD的计算。
还有一个问题:
同一个application的不同job之间的执行时串行切独立的,yarn-cluster模式下,俩job可能都不在同一台机器下执行,后面的job怎么读取前一个job中的rdd缓存?
看下获取RDD缓存的源码:
获取缓存方式分本地和远程,所以俩job不在一台机器上也没关系,可以远程获取。缓存在使用时不需要指定存储路径,spark使用application的临时目录存储缓存,所以直到application结束删除缓存,application中的job都可以访问。即便缓存的存储级别是StorageLevel.MEMORY_ONLY,读取缓存数据的时候也可能有IO,网络IO。
三。检查点checkpoint
检查点使用时是需要设置存储位置的没有设置的话会报错。
sparkContext.setCheckpointDir(directory: String)
3.1 运行关键日志
21/12/19 10:32:05 INFO WordCount: ---log1---(2) MapPartitionsRDD[1] at map at WordCount.scala:17 [Memory Deserialized 1x Replicated]
| ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 [Memory Deserialized 1x Replicated]
21/12/19 10:32:05 INFO WordCount: ---log2---(2) ShuffledRDD[2] at reduceByKey at WordCount.scala:23 []
-(2) MapPartitionsRDD[1] at map at WordCount.scala:17 []
| ParallelCollectionRDD[0] at parallelize at WordCount.scala:16 []
--------map----------
--------map----------
--------map----------
--------map----------
--------map----------
--------map----------
--------map----------
21/12/19 10:33:58 INFO WordCount: log3 (2) MapPartitionsRDD[1] at map at WordCount.scala:17 [Memory Deserialized 1x Replicated]
| CachedPartitions: 2; MemorySize: 288.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ReliableCheckpointRDD[3] at collect at WordCount.scala:26 [Memory Deserialized 1x Replicated]
21/12/19 10:33:58 INFO WordCount: log4 (2) ShuffledRDD[2] at reduceByKey at WordCount.scala:23 []
-(2) MapPartitionsRDD[1] at map at WordCount.scala:17 []
| CachedPartitions: 2; MemorySize: 288.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
| ReliableCheckpointRDD[3] at collect at WordCount.scala:26 []
---log5---
org.apache.spark.rdd.MapPartitionsRDD
org.apache.spark.rdd.ReliableCheckpointRDD
---log6---
org.apache.spark.rdd.ShuffledRDD
org.apache.spark.rdd.MapPartitionsRDD
org.apache.spark.rdd.ReliableCheckpointRDD
3.2 checkpoint源码跟踪
跟踪源码def checkpoint(): Unit = RDDCheckpointData.synchronized
/**
* Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
* directory set with `SparkContext#setCheckpointDir` and all references to its parent
* RDDs will be removed. This function must be called before any job has been
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/
def checkpoint(): Unit = RDDCheckpointData.synchronized {
// NOTE: we use a global lock here due to complexities downstream with ensuring
// children RDD partitions point to the correct parent partitions. In the future
// we should revisit this consideration.
if (context.checkpointDir.isEmpty) {
throw new SparkException("Checkpoint directory has not been set in the SparkContext")
} else if (checkpointData.isEmpty) {
checkpointData = Some(new ReliableRDDCheckpointData(this))
}
}
这里只检查了检查点路径并初始化了checkpointData对象(将rdd对象与ReliableRDDCheckpointData对象关联起来,后面的缓存结果如path会封装到ReliableRDDCheckpointData对象中),不涉及到数据的缓存。真正有数据是在运行job的时候,跟踪job的提交运算过程。
/**
* Run a function on a given set of partitions in an RDD and pass the results to the given
* handler function. This is the main entry point for all actions in Spark.
*
* @param rdd target RDD to run tasks on
* @param func a function to run on each partition of the RDD
* @param partitions set of partitions to run on; some jobs may not want to compute on all
* partitions of the target RDD, e.g. for operations like `first()`
* @param resultHandler callback to pass each result to
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
查看rdd.doCheckpoint()
/**
* Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD
* has completed (therefore the RDD has been materialized and potentially stored in memory).
* doCheckpoint() is called recursively on the parent RDDs.
*/
private[spark] def doCheckpoint(): Unit = {
RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
if (checkpointAllMarkedAncestors) {
// TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint
// them in parallel.
// Checkpoint parents first because our lineage will be truncated after we
// checkpoint ourselves
dependencies.foreach(_.rdd.doCheckpoint())
}
checkpointData.get.checkpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}
}
这里涉及到一个参数:
spark.checkpoint.checkpointAllMarkedAncestors
其默认值是false,标识是否需要迭代执行父类的checkpoint。
如果当前rdd没有定义checkpoint操作,则迭代执行父RDD 的doCheckpoint();
当前RDD的定义了checkpoint的情况,查看spark.checkpoint.checkpointAllMarkedAncestors 的配置,判断是否需要迭代执行父类的checkpoint。如果需要先做父RDD的checkpoint(迭代,保证父RDD在子RDD之前)。
checkpointData.get 返回的是new ReliableRDDCheckpointData(this)对象,this指代的是需要checkpoint的rdd。
RDDCheckpointData.scala
/**
* Materialize this RDD and persist its content.
* This is called immediately after the first action invoked on this RDD has completed.
*/
final def checkpoint(): Unit = {
// Guard against multiple threads checkpointing the same RDD by
// atomically flipping the state of this RDDCheckpointData
RDDCheckpointData.synchronized {
if (cpState == Initialized) {
cpState = CheckpointingInProgress
} else {
return
}
}
val newRDD = doCheckpoint()
// Update our state and truncate the RDD lineage
RDDCheckpointData.synchronized {
cpRDD = Some(newRDD)
cpState = Checkpointed
rdd.markCheckpointed()
}
}
protected def doCheckpoint(): CheckpointRDD[T]
doCheckpoint()是个抽象方法,看字类的实现。
只有一个子类ReliableRDDCheckpointData.scala
/**
* Materialize this RDD and write its content to a reliable DFS.
* This is called immediately after the first action invoked on this RDD has completed.
*/
protected override def doCheckpoint(): CheckpointRDD[T] = {
val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)
// Optionally clean our checkpoint files if the reference is out of scope
if (rdd.conf.get(CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS)) {
rdd.context.cleaner.foreach { cleaner =>
cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
}
}
logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
newRDD
}
调用其伴生对象中的writeRDDToCheckpointDirectory函数
/**
* Write RDD to checkpoint files and return a ReliableCheckpointRDD representing the RDD.
*/
def writeRDDToCheckpointDirectory[T: ClassTag](
originalRDD: RDD[T],
checkpointDir: String,
blockSize: Int = -1): ReliableCheckpointRDD[T] = {
val checkpointStartTimeNs = System.nanoTime()
val sc = originalRDD.sparkContext
// Create the output path for the checkpoint
val checkpointDirPath = new Path(checkpointDir)
val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
if (!fs.mkdirs(checkpointDirPath)) {
throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
}
// Save to file, and reload it as an RDD
val broadcastedConf = sc.broadcast(
new SerializableConfiguration(sc.hadoopConfiguration))
// TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
sc.runJob(originalRDD,
writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
if (originalRDD.partitioner.nonEmpty) {
writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
}
val checkpointDurationMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
logInfo(s"Checkpointing took $checkpointDurationMs ms.")
val newRDD = new ReliableCheckpointRDD[T](
sc, checkpointDirPath.toString, originalRDD.partitioner)
if (newRDD.partitions.length != originalRDD.partitions.length) {
throw new SparkException(
"Checkpoint RDD has a different number of partitions from original RDD. Original "
s"RDD [ID: ${originalRDD.id}, num of partitions: ${originalRDD.partitions.length}]; "
s"Checkpoint RDD [ID: ${newRDD.id}, num of partitions: "
s"${newRDD.partitions.length}].")
}
newRDD
}
这里提交了一个job(如果上文中提到”spark.checkpoint.checkpointAllMarkedAncestors"有相关配置,挨个对父RDD做检查点,那代价可就大了),相当于需要checkpoint的rdd调用了一个行动操作,行动操作的内容是 writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
mapRDD.checkpoint()
mapRDD.collect(ReliableCheckpointRDD.writeRDDToCheckpointDirectory())
上面两种写法逻辑上大概差不多,语法是肯定是不行的。所以checkpoint是行动操作还是转换操作?
计算结束后writePartitionerToCheckpointDir 将rdd中的数据输出到指定的路径(涉及到IO)。然后生成一个新的newRDD:ReliableCheckpointRDD,newRDD的SparkContext,分区器都和原来一样,并包含了数据存储的地址,返回该ReliableCheckpointRDD对象。
返回到上文调用计算的地方
RDDCheckpointData.scala
/**
* Materialize this RDD and persist its content.
* This is called immediately after the first action invoked on this RDD has completed.
*/
final def checkpoint(): Unit = {
// Guard against multiple threads checkpointing the same RDD by
// atomically flipping the state of this RDDCheckpointData
RDDCheckpointData.synchronized {
if (cpState == Initialized) {
cpState = CheckpointingInProgress
} else {
return
}
}
val newRDD = doCheckpoint()
// Update our state and truncate the RDD lineage
RDDCheckpointData.synchronized {
cpRDD = Some(newRDD)
cpState = Checkpointed
rdd.markCheckpointed()
}
}
更新状态,然后 执行rdd.markCheckpointed()
/**
* Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`)
* created from the checkpoint file, and forget its old dependencies and partitions.
*/
private[spark] def markCheckpointed(): Unit = stateLock.synchronized {
clearDependencies()
partitions_ = null
deps = null // Forget the constructor argument for dependencies too
}
/**
* Clears the dependencies of this RDD. This method must ensure that all references
* to the original parent RDDs are removed to enable the parent RDDs to be garbage
* collected. Subclasses of RDD may override this method for implementing their own cleaning
* logic. See [[org.apache.spark.rdd.UnionRDD]] for an example.
*/
protected def clearDependencies(): Unit = stateLock.synchronized {
dependencies_ = null
}
一个RDD对象mapRDD创建检查点后,父RDD的依赖关系会被切断(dependencies_ = null
),切断后所在DAG的源头RDD是谁?
查看RDD.scala
final def dependencies: Seq[Dependency[_]] = {
checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
if (dependencies_ == null) {
stateLock.synchronized {
if (dependencies_ == null) {
dependencies_ = getDependencies
}
}
}
dependencies_
}
}
一个rdd执行完毕checkpoint后,其父依赖变为了检查点RDD对象ReliableCheckpointRDD,依赖关系为OneToOneDependency,自身成为DAG中的第二个RDD。
checkpoint执行前后RDD血缘关系变化图:
日志中也能看出来:
3.3 checkpoint独立于application
Spark checkpoint在执行时需要明确指定存储位置,application执行结束后也不会删除checkpoint文件。
SparkContext中其实也有从checkpoint创建源头RDD的函数,只是包私有的。
四,检查点与缓存执行顺序。
再回到获取一个RDD数据的地方
/**
* Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
*/
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}
如果RDD有缓存,则从缓存中读取数据,缓存失败的情况下则触发重新计算并再次尝试缓存;
没有缓存有检查点的情况下,则从检查点中读取数据,firstParent[T].iterator(split, context),firstParent即为ReliableCheckpointRDD,当前rdd的compute方法不会执行。
如果有检查点又有缓存的情况下呢?是否与两者的调用顺序有关系?
查看DAGScheduler.scala类job提交方法:
提交job后会返回一个JobWaiter对象用来阻塞当前线程(driver线程)直到job执行结束或者用来取消作业,所以job的提交是串行的。(driver线程提交job后,job的后续工作由一个名为“dag-scheduler-event-loop”的守护线程负责)
再看SparkContext.scala
检查点生成的job在行动算子生成的job之后执行,所以缓存肯定先于检查点执行,与两者的调用顺序无关。检查点job执行时,获取mapRDD数据,这是缓存中已经缓存了,所以不会触发其compute方法的执行,直接从缓存中读取数据。
五,总结
1,缓存(cache,persist)不会改变RDD的血缘关系:但是会切断RDD依赖链的计算依赖,如果RDD被缓存,那么该RDD上游的所有RDD封装的运算逻辑不会被执行。
2,spark 缓存是application级别:存储在application的临时目录存储(如果需要存硬盘),所以直到application结束删除缓存,application中的job都可以访问。
3,即便缓存的存储级别是StorageLevel.MEMORY_ONLY,读取缓存数据的时候也可能有IO,网络IO。
4,checkpoint会改变RDD血缘关系:一个rdd执行完毕checkpoint后,其父依赖变为了检查点RDD对象ReliableCheckpointRDD,依赖关系为OneToOneDependency,自身成为DAG中的第二个RDD。
5,checkpoint独立于application:application运行结束后不会删除checkpoint目录,SparkContext可以从checkpoint目录创建ReliableCheckpointRDD作为DAG的源头(protected[spark])。
6,检查点checkpoint会提交Job:
checkpoint是个怪胎,如果一个application中没有其它行动操作,那么checkpoint也不会提交job,可以说是转换操作;如果有其它行动操作那么它也会提交job,也可以说是行动操作。
7,缓存先于检查点执行:检查点生成的job在行动算子生成的job之后执行,所以缓存肯定先于检查点执行,与两者的调用顺序无关。
8,检查点应该配合缓存使用:检查点会重新提交Job重复执行RDD的逻辑计算,但是如果RDD缓存数据了,则直接读缓存获取行动操作计算的结果,不用重复执行计算,直接进行IO输出缓存结果。
9,多job情况下,缓存公用的RDD,可避免重复计算。
10,单job,但是DAG过长的情况下,缓存计算量大的RDD,有利于任务失败的情况下恢复数据。
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgcibai
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13