• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Spark Job写文件个数的控制以和小文件合并的优化

武飞扬头像
wankunde
帮助1

背景说明

大数据领域,平台小文件治理一直是一个非常重要的问题。我司大佬在Spark平台里,在向目标表中增加一个Shuffle,然后在Reduce端合并数据,以实现将小文件合并成大文件,来减少平台中的小文件。
我司还对单个任务写HDFS文件个数做了限制,同时限制了单个Task 和 单次Job 可写的HDFS个数限制。

通过引入额外Shuffle对写入数据进行合并

最终实现效果如下

== Optimized Logical Plan ==
CreateDataSourceTableAsSelectCommand `P_WAKUN_T`.`user_part_2`, ErrorIfExists, [id, name, dt]
 - Relation p_wakun_t.user_part[id#18274940,name#18274941,dt#18274942] parquet

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
 - Execute CreateDataSourceTableAsSelectCommand `P_WAKUN_T`.`user_part_2`, ErrorIfExists, [id, name, dt]
    - CustomShuffleReader coalesced
       - ShuffleQueryStage 0
          - Exchange RoundRobinPartitioning(10000), REPARTITION_BY_NONE, [id=#30214771]
             - *(1) ColumnarToRow
                - FileScan parquet p_wakun_t.user_part[id#18274940,name#18274941,dt#18274942] Batched: true, DataFilters: [], Format: Parquet, Location: xx

EnsureRepartitionForWriting Rule

EnsureRepartitionForWriting 中对 DataWritingCommandExec 进行数据写入之前,增加一个Shuffle。
当然 Rule 中还要考虑Partition table, Bucket table 的Shuffle 方式,不能把数据给搞混了。

CoalesceShufflePartitions Rule

CoalesceShufflePartitions Rule 会根据Shuffle结果,coalesce 数据到合适的 Partition 个数。

OptimizeShuffleWithLocalRead Rule

Local Shuffle Read 是Spark新增的对Spark Shuffle 过程进行优化的Rule,当Shuffle required distribution 不需要按照Hash分布的约束,以及满足其他的一些条件时,Reduce 端修改为连续读某一个Map 的Shuffle Output,这样会有更好的数据本地性,Shuffle 性能也会有提升。
这个Rule 之前叫 OptimizeLocalShuffleReader Rule。
其他应用条件:

  • 如果是 DataWritingCommandExec, 只能优化它的Child 节点
  • 如果是 Shuffle Query Stage, Shuffle 类型只能是 ENSURE_REQUIREMENTSREPARTITION_BY_NONE

分布式数据写控制

在 Hadoop 的 MapReduce 中,通过 FileOutputCommitter 控制分布式数据写Job setup,Task commit, 以及Job commit.
FileOutputCommitter 的 v1 算法对task 输出做两次rename 控制, v2算法对task输出做一次rename控制。

在Spark中有一套新的 FileCommitProtocol, 组合使用了 Hadoop 的 FileOutputCommitter 来控制Job 的写过程。上面要实现的控制单 Task 和 Job 输出文件个数的实现也就是在这里实现的。
通过下面的时序图可以看到,Task端可以通过创建新的文件 newTaskTempFile() 时check task file number; SparkContext.runJob() 方法有一个参数 resultHandler 用于处理Task 执行完成后 result 的回调。写数据的Task 最终返回的结果就是 WriteTaskResult (内部包含写的文件个数),在 resultHandler 中对所有Tasks 的写文件个数进行累加。当超过 maxCreatedFilesInDynamicPartition 报错。

学新通

FileFormatWriter SparkContext Task SQLHadoopMapReduceCommitProtocol OutputCommitter DynamicPartitionDataWriter createCommitter() setupJob(jobContext) setupCommitter() setupJob() runJob() executeTask() setupTask() setupTask() Execute Task write() newTaskTempFile() create new file && fileCounter = 1 loop [call DynamicPartitionDataWriter to write data] commitTask() commitTask() abortTask() abortTask() alt [task success] [task fail] WriteTaskResult onTaskCommit() check file numbers commitJob() commitJob() move all stage files to final directory abort() abortJob() alt [job success] [job fail] FileFormatWriter SparkContext Task SQLHadoopMapReduceCommitProtocol OutputCommitter DynamicPartitionDataWriter

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhggifie
系列文章
更多 icon
同类精品
更多 icon
继续加载