• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Spark005---map、mapPartitions

武飞扬头像
维格堂406小队
帮助1

Intro

map、mapPartitions的使用和差异

map

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local[*]").getOrCreate()
import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@22ed5cd
val dataRDD = spark.sparkContext.makeRDD(List(1, 2, 3, 4), 2)
    val dataRDD1 = dataRDD.map(
      num => {
        println("step1<<<<<<<<"   num)
        num
      }
    )
    val dataRDD2 = dataRDD1.map(
      num => {
        println("step2>>>>>>>>"   num)
        num
      }
    )
println(dataRDD2.collect().toArray.mkString(","))
step1<<<<<<<<1
step1<<<<<<<<3
step2>>>>>>>>3
step2>>>>>>>>1
step1<<<<<<<<4
step2>>>>>>>>4
step1<<<<<<<<2
step2>>>>>>>>2
1,2,3,4





dataRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:27
dataRDD1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:28
dataRDD2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:34
学新通
  • 不同分区并行处理,1、3分属两个分区,同时执行
  • 对于某一个具体数据,同时执行完step1和step2
  • 对于同一个分区数据(1,2),顺次执行,1执行完step1、step2,2再执行step1、2

mapPartitions

val dataRDD3 = dataRDD.mapPartitions(
      data => {
        println("step1<<<<<<<<")
        // 转成array,方便查看数据
        val data1 = data.map(num => num).toArray
        //        println(data1.mkString(","))
        println("step2>>>>>>>>")
        val data2 = data1.map(num => num * 2)
        println(data2.mkString(","))
        data2.iterator
      }
    )
println(dataRDD3.collect().toArray.mkString(","))
step1<<<<<<<<
step1<<<<<<<<
step2>>>>>>>>
step2>>>>>>>>
2,4
6,8
2,4,6,8





dataRDD3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at mapPartitions at <console>:27
  • mapPartitions同一分区批处理,分区数据(1,2)整体执行step1、step2
  • 不同分区可以整体执行,所以可看到有两个step1

mapPartitionsWithIndex

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引
感觉平时并不怎么用,单纯记录下吧

    val dataRDD4 = dataRDD.mapPartitionsWithIndex(
      (index, datas) => {
        datas.map(x => (index,x))
      }
    )
    println(dataRDD4.collect().toArray.mkString(","))
(0,1),(0,2),(1,3),(1,4)





dataRDD4: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[4] at mapPartitionsWithIndex at <console>:27

两个小测试

  • 找出每个分区最大的值
  • 找出第二个分区的数据
    val dataRDD5 = dataRDD.mapPartitionsWithIndex(
      (index, datas) => {
        List(s"index=${index},max=${datas.max}").toIterator
      }
    )
    println(dataRDD5.collect().toArray.mkString(";"))
index=0,max=2;index=1,max=4





dataRDD5: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at mapPartitionsWithIndex at <console>:27
val dataRDD6 = dataRDD.mapPartitionsWithIndex(
      (index, datas) => {
        index match {
          case 1 => Array(1, datas.toArray).toIterator
          case _ => Nil.toIterator
        }
      }
    )
    val arr = dataRDD6.collect().toArray
    val index = arr(0)
    val data = arr(1).asInstanceOf[Array[Int]]
    println(s"index=${index},data=${data.mkString(",")}")
index=1,data=3,4





dataRDD6: org.apache.spark.rdd.RDD[Any] = MapPartitionsRDD[7] at mapPartitionsWithIndex at <console>:27
arr: Array[Any] = Array(1, Array(3, 4))
index: Any = 1
data: Array[Int] = Array(3, 4)

总结

map和mapPartitionsWithIndex两种方法差异:

  • 数据处理角度
    • Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作
  • 功能的角度
    • Map 算子主要目的将数据源中的数据进行转换和改变,但是不会减少或增多数据
    • MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
  • 性能的角度
    • Map算子因为类似于串行操作,所以性能比较低
    • mapPartitions 算子类似于批处理,所以性能较高。单会长时间占用内存,在内存有限的情况下,不推荐使用

                                2021-11-17 于南京市江宁区九龙湖

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgkhkkj
系列文章
更多 icon
同类精品
更多 icon
继续加载