• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

源码Spark各个ShuffleWriter的实现机制三——SortShuffleWriter

武飞扬头像
L4mbert
帮助1

基于3.2版本分支。

SortShuffleWriter

想象远超内存大小的数据需要排序的场景,显然全量加载数据到内存进行排序是不可行的,那就需要将数据放在硬盘中,即外部排序,Spark采取的是归并排序1,这就是SortShuffleWriter做的事情。

SortShuffleWriter在写入时,会根据是否有mapSideCombine2选择使用不同的数据结构来进行排序。有mapSideCombine,那么采取map或buffer。

归并排序分为两个阶段,第一阶段是分片输出有序文件,第二阶段是归并输出整体有序文件。先看Spark中对第一阶段是如何实现的:

// SortShuffleWriter
override def write(records: Iterator[Product2[K, V]]): Unit = {
  sorter = if (dep.mapSideCombine) {
    // 有mapSideCombine,就有传入aggerator和keyOrdering
    new ExternalSorter[K, V, C](
      context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
  } else {
    // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
    // care whether the keys get sorted in each partition; that will be done on the reduce side
    // if the operation being run is sortByKey.
    // 没有mapSideCombine,aggerator和keyOrdering就都是空的
    // 结合上边的官方注释,在不需要mapSideCombine的场景下,就不需要关心每个rdd分区中key的排序;
    // 如有需要,就在sortByKey即reduce端进行排序
    new ExternalSorter[K, V, V](
      context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
  }
  // 也就是在insertAll中通过对有无aggreator的判断,选择了对应的数据结构,完成排序并写入到外部文件中,
  // 需要进一步查看该实现
  sorter.insertAll(records)

  // 省略归并排序第二阶段
  // ...
}
学新通
// ExternalSort
def insertAll(records: Iterator[Product2[K, V]]): Unit = {
  // TODO: stop combining if we find that the reduction factor isn't high
  val shouldCombine = aggregator.isDefined

  if (shouldCombine) {
    // 这里也就是在实现mapSideCombine,将相同Key的结果聚合起来
    // Combine values in-memory first using our AppendOnlyMap
    val mergeValue = aggregator.get.mergeValue
    val createCombiner = aggregator.get.createCombiner
    var kv: Product2[K, V] = null
    val update = (hadValue: Boolean, oldValue: C) => {
      // 聚合相同key对应的值
      if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
    }
    while (records.hasNext) {
      // 记录下读取的数据条数
      addElementsRead()
      kv = records.next()
      // 更新key对应的值
      map.changeValue((getPartition(kv._1), kv._1), update)
      // 这里判断是否需要分割到不同的文件,需要进一步查看该实现
      maybeSpillCollection(usingMap = true)
    }
  } else {
    // Stick values into our buffer
    // 不需要聚合相同key
    while (records.hasNext) {
      addElementsRead()
      val kv = records.next()
      // 将数据写入到buffer中,在buffer[N]存储记录的分区id以及key,在buffer[N 1]存储value
      buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
      maybeSpillCollection(usingMap = false)
    }
  }
}
// ExternalSort
 private def maybeSpillCollection(usingMap: Boolean): Unit = {
 var estimatedSize = 0L
 // 这里的实现思路是获取当前容器的估计大小,当超过指定值时,分割到不同的文件中
 // 并重新指定新的容器
 if (usingMap) {
   estimatedSize = map.estimateSize()
   // 需要进一步查看该实现
   if (maybeSpill(map, estimatedSize)) {
     map = new PartitionedAppendOnlyMap[K, C]
   }
 } else {
   estimatedSize = buffer.estimateSize()
   if (maybeSpill(buffer, estimatedSize)) {
     buffer = new PartitionedPairBuffer[K, C]
   }
 }

 if (estimatedSize > _peakMemoryUsedBytes) {
   _peakMemoryUsedBytes = estimatedSize
 }
}
// Spillable
// 判断是否需要分片切割,如果需要,那么返回true,反之返回false
// 同时,如果需要分片切割,那么将其写入到硬盘中
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
  var shouldSpill = false
  if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
    // Claim up to double our current memory from the shuffle memory pool
    val amountToRequest = 2 * currentMemory - myMemoryThreshold
    val granted = acquireMemory(amountToRequest)
    myMemoryThreshold  = granted
    // If we were granted too little memory to grow further (either tryToAcquire returned 0,
    // or we already had more memory than myMemoryThreshold), spill the current collection
    shouldSpill = currentMemory >= myMemoryThreshold
  }
  shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
  // Actually spill
  // 需要分片切割
  if (shouldSpill) {
    _spillCount  = 1
    logSpillage(currentMemory)
    // 写入到文件中,需要进一步查看该实现
    spill(collection)
    _elementsRead = 0
    _memoryBytesSpilled  = currentMemory
    // 通知taskManager释放内存
    releaseMemory()
  }
  shouldSpill
}

// ExternalSorter
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
  // 写入文件之前,进行对key进行排序,注意没有mapSideCombine时是只对分区id进行排序
  // 需要进一步查看comparator和destructiveSortedWritablePartitionedIterator的实现
  val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
  // 将分片写入到硬盘
  val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
  // 记录分片文件
  spills  = spillFile
}
// 上边comparator的值,当没有mapSideCombine时,也就是None
private def comparator: Option[Comparator[K]] = {
  if (ordering.isDefined || aggregator.isDefined) {
    Some(keyComparator)
  } else {
    None
  }
}
// WritablePartitionedPairCollection
def destructiveSortedWritablePartitionedIterator(keyComparator: Option[Comparator[K]])
  : WritablePartitionedIterator = {
  // 这个方法的实现根据使用的map还是buffer,有不同的实现,需要进一步查看
  val it = partitionedDestructiveSortedIterator(keyComparator)
  new WritablePartitionedIterator {
    private[this] var cur = if (it.hasNext) it.next() else null

    def writeNext(writer: DiskBlockObjectWriter): Unit = {
      writer.write(cur._1._2, cur._2)
      cur = if (it.hasNext) it.next() else null
    }

    def hasNext(): Boolean = cur != null

    def nextPartition(): Int = cur._1._1
  }
}

// PartitionedPairBuffer
// 使用buffer时,也就是没有mapSideCombine
/** Iterate through the data in a given order. For this class this is not really destructive. */
override def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
  : Iterator[((Int, K), V)] = {
  // 由于keyComparator为None,因此使用了partitionComparator进行排序,即对记录的分区id进行排序
  val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
  new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator)
  iterator
}

// PartitionedAppendOnlyMap
// 使用map时,也就是有mapSideCombine
def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
  : Iterator[((Int, K), V)] = {
  // keyComparator非None,因此生成对记录的分区id(先)和key(后)进行排序的排序器
  val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
  // 内存排序
  destructiveSortedIterator(comparator)
}
学新通

到这里,我们完成了分片的切割和写入外部文件,接下来需要确认Spark如何归并这些文件,回到SortShuffleWriter:

// SortShuffleWriter
override def write(records: Iterator[Product2[K, V]]): Unit = {
  // 省略归并排序第一阶段
  // sorter.insertAll(records)

  // 生成整体有序文件的对象
  val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
  // 创建整体有序文件,文件名即"${blockId.name}.${UUID}"
  val tmp = Utils.tempFileWith(output)
  try {
    val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
    // 归并输出整体有序文件,得到的partitionLengths数组是这样存储的:array[partitionId] = fileSegment.length
    // 数组每个元素存储着对应分区fileSegment的大小
    // 进一步查看该实现
    val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
    // 输出索引文件,这在BypassMergeSortShuffleWriter就讲过啦
    shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
    // 上报该task的结果
    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
  } finally {
    if (tmp.exists() && !tmp.delete()) {
      logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
    }
  }
}

// ExternalSorter
def writePartitionedFile(
    blockId: BlockId,
    outputFile: File): Array[Long] = {

  // Track location of each range in the output file
  val lengths = new Array[Long](numPartitions)
  val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
    context.taskMetrics().shuffleWriteMetrics)

  // 如果第一阶段没有切片文件,那么只需要在内存中直接进行排序即可
  if (spills.isEmpty) {
    // Case where we only have in-memory data
    val collection = if (aggregator.isDefined) map else buffer
    val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
    while (it.hasNext) {
      val partitionId = it.nextPartition()
      while (it.hasNext && it.nextPartition() == partitionId) {
        it.writeNext(writer)
      }
      val segment = writer.commitAndGet()
      lengths(partitionId) = segment.length
    }
  } else {
    // 第一阶段有切片文件,那需要进行归并排序了,这里的关键实现即partitionedIterator,
    // 它完成了归并,后续即写入到文件中,因此需要进一步查看
    // We must perform merge-sort; get an iterator by partition and write everything directly.
    for ((id, elements) <- this.partitionedIterator) {
      if (elements.hasNext) {
        for (elem <- elements) {
          writer.write(elem._1, elem._2)
        }
        val segment = writer.commitAndGet()
        lengths(id) = segment.length
      }
    }
  }

  writer.close()
  // 省略硬盘、内存占用的数据上报...
  lengths
}
// ExternalSorter
def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
  val usingMap = aggregator.isDefined
  // 这里其实还可能有一部分数据是没在第一阶段写到分片文件中的,因此需要取出一起归并
  val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
  if (spills.isEmpty) {
    // 省略内存排序
  } else {
    // Merge spilled and in-memory data
    // 这里合并切片文件,以及在内存中的数据,注意这里要对内存中的数据先进行一次排序,
    // 这个排序方法在第一阶段出现过,不再赘述。进一步查看merge的实现
    merge(spills, destructiveIterator(
      collection.partitionedDestructiveSortedIterator(comparator)))
  }
}


private def merge(spills: Seq[SpilledFile], inMemory: Iterator[((Int, K), C)])
    : Iterator[(Int, Iterator[Product2[K, C]])] = {
  // 生成切片文件的读取器
  val readers = spills.map(new SpillReader(_))
  // 内存中的数据,转为数组
  val inMemBuffered = inMemory.buffered
  (0 until numPartitions).iterator.map { p =>
    // IteratorForPartition即只能取出对应分区id的数据
    val inMemIterator = new IteratorForPartition(p, inMemBuffered)
    // 注意在第一阶段,每个切片文件都是至少有按照分区id进行排序的,
    // 这里将每个切片文件的指定分区id的数据迭代器和内存指定分区id的数据迭代器进行合并,生成该分区的数据迭代器
    val iterators = readers.map(_.readNextPartition())    Seq(inMemIterator)
    if (aggregator.isDefined) {
      // Perform partial aggregation across partitions
      // 按分区进行归并,这里就不用再细化了,k路归并算法都学过吧,有兴趣自己读下源码...
      // 最终返回(分区id, 该分区下的有序数据迭代器)
      (p, mergeWithAggregation(
        iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))
    } else if (ordering.isDefined) {
      // 在SortShuffleWriter场景,这个分支是不会走的,官方注释也讲了在sortByKey才会走到这个分支
      // No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);
      // sort the elements without trying to merge them
      (p, mergeSort(iterators, ordering.get))
    } else {
      (p, iterators.iterator.flatten)
    }
  }
}
学新通

  1. 归并排序:分段读取一部分数据,通过内部排序后输出有序文件到硬盘,直至所有数据都分段输出到硬盘,再归并这些有序的数据。SortShulle采取的k路归并。 ↩︎

  2. mapSideCombine: 即在map端对数据进行合并,减少shuffle的数据量。 ↩︎

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgchefc
系列文章
更多 icon
同类精品
更多 icon
继续加载